HBASE-9116. Adds a view/edit tool for favored nodes mapping. Also implements the FavoredNodeLoadBalancer.balanceCluster method.
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f6c74ca78d
commit
b936444e34
|
@ -241,4 +241,12 @@
|
|||
<Bug pattern="MS_MUTABLE_ARRAY"/>
|
||||
</Match>
|
||||
|
||||
<Match>
|
||||
<!--
|
||||
The logic explicitly checks equality of two floating point numbers. Ignore the warning
|
||||
!-->
|
||||
<Class name="org.apache.hadoop.hbase.master.AssignmentVerificationReport"/>
|
||||
<Bug pattern="FE_FLOATING_POINT_EQUALITY"/>
|
||||
</Match>
|
||||
|
||||
</FindBugsFilter>
|
||||
|
|
|
@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
|
||||
|
@ -714,6 +716,25 @@ public final class RequestConverter {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer UpdateFavoredNodesRequest to update a list of favorednode mappings
|
||||
* @param updateRegionInfos
|
||||
* @return a protocol buffer UpdateFavoredNodesRequest
|
||||
*/
|
||||
public static UpdateFavoredNodesRequest buildUpdateFavoredNodesRequest(
|
||||
final List<Pair<HRegionInfo, List<ServerName>>> updateRegionInfos) {
|
||||
UpdateFavoredNodesRequest.Builder ubuilder = UpdateFavoredNodesRequest.newBuilder();
|
||||
for (Pair<HRegionInfo, List<ServerName>> pair : updateRegionInfos) {
|
||||
RegionUpdateInfo.Builder builder = RegionUpdateInfo.newBuilder();
|
||||
builder.setRegion(HRegionInfo.convert(pair.getFirst()));
|
||||
for (ServerName server : pair.getSecond()) {
|
||||
builder.addFavoredNodes(ProtobufUtil.toServerName(server));
|
||||
}
|
||||
ubuilder.addUpdateInfo(builder.build());
|
||||
}
|
||||
return ubuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a CloseRegionRequest for a given region name
|
||||
*
|
||||
|
@ -1441,4 +1462,4 @@ public final class RequestConverter {
|
|||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -466,6 +466,10 @@ possible configurations would overwhelm and obscure the important.
|
|||
<value>60000</value>
|
||||
<description>Client scanner lease period in milliseconds.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.client.localityCheck.threadPoolSize</name>
|
||||
<value>2</value>
|
||||
</property>
|
||||
|
||||
<!--Miscellaneous configuration-->
|
||||
<property>
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -145,6 +145,19 @@ message CompactRegionRequest {
|
|||
message CompactRegionResponse {
|
||||
}
|
||||
|
||||
message UpdateFavoredNodesRequest {
|
||||
repeated RegionUpdateInfo update_info = 1;
|
||||
|
||||
message RegionUpdateInfo {
|
||||
required RegionInfo region = 1;
|
||||
repeated ServerName favored_nodes = 2;
|
||||
}
|
||||
}
|
||||
|
||||
message UpdateFavoredNodesResponse {
|
||||
optional uint32 response = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the specified regions.
|
||||
* <p>
|
||||
|
@ -251,4 +264,7 @@ service AdminService {
|
|||
|
||||
rpc StopServer(StopServerRequest)
|
||||
returns(StopServerResponse);
|
||||
|
||||
rpc UpdateFavoredNodes(UpdateFavoredNodesRequest)
|
||||
returns(UpdateFavoredNodesResponse);
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
|
|||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
|
||||
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
|
||||
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
|
||||
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
|
||||
|
@ -126,6 +127,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
private final TableLockManager tableLockManager;
|
||||
|
||||
private AtomicInteger numRegionsOpened = new AtomicInteger(0);
|
||||
|
||||
final private KeyLocker<String> locker = new KeyLocker<String>();
|
||||
|
||||
/**
|
||||
|
@ -1366,7 +1369,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.warn("A region was opened on a dead server, ServerName=" +
|
||||
sn + ", region=" + regionInfo.getEncodedName());
|
||||
}
|
||||
|
||||
numRegionsOpened.incrementAndGet();
|
||||
regionStates.regionOnline(regionInfo, sn);
|
||||
|
||||
// Remove plan if one.
|
||||
|
@ -2411,6 +2414,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by unit tests. Return the number of regions opened so far in the life
|
||||
* of the master. Increases by one every time the master opens a region
|
||||
* @return the counter value of the number of regions opened so far
|
||||
*/
|
||||
public int getNumRegionsOpened() {
|
||||
return numRegionsOpened.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until the specified region has completed assignment.
|
||||
* <p>
|
||||
|
@ -2558,14 +2570,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
|
||||
// Scan META for all user regions, skipping any disabled tables
|
||||
Map<HRegionInfo, ServerName> allRegions;
|
||||
if (this.shouldAssignRegionsWithFavoredNodes) {
|
||||
allRegions = FavoredNodeAssignmentHelper.fullScan(
|
||||
catalogTracker, disabledOrDisablingOrEnabling, true, (FavoredNodeLoadBalancer)balancer);
|
||||
} else {
|
||||
allRegions = MetaReader.fullScan(
|
||||
catalogTracker, disabledOrDisablingOrEnabling, true);
|
||||
}
|
||||
|
||||
SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
|
||||
new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
|
||||
snapshotOfRegionAssignment.initialize();
|
||||
allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
|
||||
if (allRegions == null) return;
|
||||
|
||||
//remove system tables because they would have been assigned earlier
|
||||
|
|
|
@ -0,0 +1,598 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
|
||||
/**
|
||||
* Helper class that is used by {@link RegionPlacementMaintainer} to print
|
||||
* information for favored nodes
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AssignmentVerificationReport {
|
||||
protected static final Log LOG = LogFactory.getLog(
|
||||
AssignmentVerificationReport.class.getName());
|
||||
|
||||
private TableName tableName = null;
|
||||
private boolean enforceLocality = false;
|
||||
private boolean isFilledUp = false;
|
||||
|
||||
private int totalRegions = 0;
|
||||
private int totalRegionServers = 0;
|
||||
// for unassigned regions
|
||||
private List<HRegionInfo> unAssignedRegionsList =
|
||||
new ArrayList<HRegionInfo>();
|
||||
|
||||
// For regions without valid favored nodes
|
||||
private List<HRegionInfo> regionsWithoutValidFavoredNodes =
|
||||
new ArrayList<HRegionInfo>();
|
||||
|
||||
// For regions not running on the favored nodes
|
||||
private List<HRegionInfo> nonFavoredAssignedRegionList =
|
||||
new ArrayList<HRegionInfo>();
|
||||
|
||||
// For regions running on the favored nodes
|
||||
private int totalFavoredAssignments = 0;
|
||||
private int[] favoredNodes = new int[FavoredNodeAssignmentHelper.FAVORED_NODES_NUM];
|
||||
private float[] favoredNodesLocalitySummary =
|
||||
new float[FavoredNodeAssignmentHelper.FAVORED_NODES_NUM];
|
||||
private float actualLocalitySummary = 0;
|
||||
|
||||
// For region balancing information
|
||||
private float avgRegionsOnRS = 0;
|
||||
private int maxRegionsOnRS = 0;
|
||||
private int minRegionsOnRS = Integer.MAX_VALUE;
|
||||
private Set<ServerName> mostLoadedRSSet =
|
||||
new HashSet<ServerName>();
|
||||
private Set<ServerName> leastLoadedRSSet =
|
||||
new HashSet<ServerName>();
|
||||
|
||||
private float avgDispersionScore = 0;
|
||||
private float maxDispersionScore = 0;
|
||||
private Set<ServerName> maxDispersionScoreServerSet =
|
||||
new HashSet<ServerName>();
|
||||
private float minDispersionScore = Float.MAX_VALUE;
|
||||
private Set<ServerName> minDispersionScoreServerSet =
|
||||
new HashSet<ServerName>();
|
||||
|
||||
private float avgDispersionNum = 0;
|
||||
private float maxDispersionNum = 0;
|
||||
private Set<ServerName> maxDispersionNumServerSet =
|
||||
new HashSet<ServerName>();
|
||||
private float minDispersionNum = Float.MAX_VALUE;
|
||||
private Set<ServerName> minDispersionNumServerSet =
|
||||
new HashSet<ServerName>();
|
||||
|
||||
public void fillUp(TableName tableName, SnapshotOfRegionAssignmentFromMeta snapshot,
|
||||
Map<String, Map<String, Float>> regionLocalityMap) {
|
||||
// Set the table name
|
||||
this.tableName = tableName;
|
||||
|
||||
// Get all the regions for this table
|
||||
List<HRegionInfo> regionInfoList =
|
||||
snapshot.getTableToRegionMap().get(tableName);
|
||||
// Get the total region num for the current table
|
||||
this.totalRegions = regionInfoList.size();
|
||||
|
||||
// Get the existing assignment plan
|
||||
FavoredNodesPlan favoredNodesAssignment = snapshot.getExistingAssignmentPlan();
|
||||
// Get the region to region server mapping
|
||||
Map<HRegionInfo, ServerName> currentAssignment =
|
||||
snapshot.getRegionToRegionServerMap();
|
||||
// Initialize the server to its hosing region counter map
|
||||
Map<ServerName, Integer> serverToHostingRegionCounterMap =
|
||||
new HashMap<ServerName, Integer>();
|
||||
|
||||
Map<ServerName, Integer> primaryRSToRegionCounterMap =
|
||||
new HashMap<ServerName, Integer>();
|
||||
Map<ServerName, Set<ServerName>> primaryToSecTerRSMap =
|
||||
new HashMap<ServerName, Set<ServerName>>();
|
||||
|
||||
// Check the favored nodes and its locality information
|
||||
// Also keep tracker of the most loaded and least loaded region servers
|
||||
for (HRegionInfo region : regionInfoList) {
|
||||
try {
|
||||
ServerName currentRS = currentAssignment.get(region);
|
||||
// Handle unassigned regions
|
||||
if (currentRS == null) {
|
||||
unAssignedRegionsList.add(region);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Keep updating the server to is hosting region counter map
|
||||
Integer hostRegionCounter = serverToHostingRegionCounterMap.get(currentRS);
|
||||
if (hostRegionCounter == null) {
|
||||
hostRegionCounter = Integer.valueOf(0);
|
||||
}
|
||||
hostRegionCounter = hostRegionCounter.intValue() + 1;
|
||||
serverToHostingRegionCounterMap.put(currentRS, hostRegionCounter);
|
||||
|
||||
// Get the favored nodes from the assignment plan and verify it.
|
||||
List<ServerName> favoredNodes = favoredNodesAssignment.getFavoredNodes(region);
|
||||
if (favoredNodes == null ||
|
||||
favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
|
||||
regionsWithoutValidFavoredNodes.add(region);
|
||||
continue;
|
||||
}
|
||||
// Get the primary, secondary and tertiary region server
|
||||
ServerName primaryRS =
|
||||
favoredNodes.get(FavoredNodesPlan.Position.PRIMARY.ordinal());
|
||||
ServerName secondaryRS =
|
||||
favoredNodes.get(FavoredNodesPlan.Position.SECONDARY.ordinal());
|
||||
ServerName tertiaryRS =
|
||||
favoredNodes.get(FavoredNodesPlan.Position.TERTIARY.ordinal());
|
||||
|
||||
// Update the primary rs to its region set map
|
||||
Integer regionCounter = primaryRSToRegionCounterMap.get(primaryRS);
|
||||
if (regionCounter == null) {
|
||||
regionCounter = Integer.valueOf(0);
|
||||
}
|
||||
regionCounter = regionCounter.intValue() + 1;
|
||||
primaryRSToRegionCounterMap.put(primaryRS, regionCounter);
|
||||
|
||||
// Update the primary rs to secondary and tertiary rs map
|
||||
Set<ServerName> secAndTerSet = primaryToSecTerRSMap.get(primaryRS);
|
||||
if (secAndTerSet == null) {
|
||||
secAndTerSet = new HashSet<ServerName>();
|
||||
}
|
||||
secAndTerSet.add(secondaryRS);
|
||||
secAndTerSet.add(tertiaryRS);
|
||||
primaryToSecTerRSMap.put(primaryRS, secAndTerSet);
|
||||
|
||||
// Get the position of the current region server in the favored nodes list
|
||||
FavoredNodesPlan.Position favoredNodePosition =
|
||||
FavoredNodesPlan.getFavoredServerPosition(favoredNodes, currentRS);
|
||||
|
||||
// Handle the non favored assignment.
|
||||
if (favoredNodePosition == null) {
|
||||
nonFavoredAssignedRegionList.add(region);
|
||||
continue;
|
||||
}
|
||||
// Increase the favored nodes assignment.
|
||||
this.favoredNodes[favoredNodePosition.ordinal()]++;
|
||||
totalFavoredAssignments++;
|
||||
|
||||
// Summary the locality information for each favored nodes
|
||||
if (regionLocalityMap != null) {
|
||||
// Set the enforce locality as true;
|
||||
this.enforceLocality = true;
|
||||
|
||||
// Get the region degree locality map
|
||||
Map<String, Float> regionDegreeLocalityMap =
|
||||
regionLocalityMap.get(region.getEncodedName());
|
||||
if (regionDegreeLocalityMap == null) {
|
||||
continue; // ignore the region which doesn't have any store files.
|
||||
}
|
||||
|
||||
// Get the locality summary for each favored nodes
|
||||
for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
|
||||
ServerName favoredNode = favoredNodes.get(p.ordinal());
|
||||
// Get the locality for the current favored nodes
|
||||
Float locality =
|
||||
regionDegreeLocalityMap.get(favoredNode.getHostname());
|
||||
if (locality != null) {
|
||||
this.favoredNodesLocalitySummary[p.ordinal()] += locality;
|
||||
}
|
||||
}
|
||||
|
||||
// Get the locality summary for the current region server
|
||||
Float actualLocality =
|
||||
regionDegreeLocalityMap.get(currentRS.getHostname());
|
||||
if (actualLocality != null) {
|
||||
this.actualLocalitySummary += actualLocality;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot verify the region assignment for region " +
|
||||
((region == null) ? " null " : region.getRegionNameAsString()) +
|
||||
"because of " + e);
|
||||
}
|
||||
}
|
||||
|
||||
float dispersionScoreSummary = 0;
|
||||
float dispersionNumSummary = 0;
|
||||
// Calculate the secondary score for each primary region server
|
||||
for (Map.Entry<ServerName, Integer> entry :
|
||||
primaryRSToRegionCounterMap.entrySet()) {
|
||||
ServerName primaryRS = entry.getKey();
|
||||
Integer regionsOnPrimary = entry.getValue();
|
||||
|
||||
// Process the dispersion number and score
|
||||
float dispersionScore = 0;
|
||||
int dispersionNum = 0;
|
||||
if (primaryToSecTerRSMap.get(primaryRS) != null
|
||||
&& regionsOnPrimary.intValue() != 0) {
|
||||
dispersionNum = primaryToSecTerRSMap.get(primaryRS).size();
|
||||
dispersionScore = dispersionNum /
|
||||
((float) regionsOnPrimary.intValue() * 2);
|
||||
}
|
||||
// Update the max dispersion score
|
||||
if (dispersionScore > this.maxDispersionScore) {
|
||||
this.maxDispersionScoreServerSet.clear();
|
||||
this.maxDispersionScoreServerSet.add(primaryRS);
|
||||
this.maxDispersionScore = dispersionScore;
|
||||
} else if (dispersionScore == this.maxDispersionScore) {
|
||||
this.maxDispersionScoreServerSet.add(primaryRS);
|
||||
}
|
||||
|
||||
// Update the max dispersion num
|
||||
if (dispersionNum > this.maxDispersionNum) {
|
||||
this.maxDispersionNumServerSet.clear();
|
||||
this.maxDispersionNumServerSet.add(primaryRS);
|
||||
this.maxDispersionNum = dispersionNum;
|
||||
} else if (dispersionNum == this.maxDispersionNum) {
|
||||
this.maxDispersionNumServerSet.add(primaryRS);
|
||||
}
|
||||
|
||||
// Update the min dispersion score
|
||||
if (dispersionScore < this.minDispersionScore) {
|
||||
this.minDispersionScoreServerSet.clear();
|
||||
this.minDispersionScoreServerSet.add(primaryRS);
|
||||
this.minDispersionScore = dispersionScore;
|
||||
} else if (dispersionScore == this.minDispersionScore) {
|
||||
this.minDispersionScoreServerSet.add(primaryRS);
|
||||
}
|
||||
|
||||
// Update the min dispersion num
|
||||
if (dispersionNum < this.minDispersionNum) {
|
||||
this.minDispersionNumServerSet.clear();
|
||||
this.minDispersionNumServerSet.add(primaryRS);
|
||||
this.minDispersionNum = dispersionNum;
|
||||
} else if (dispersionNum == this.minDispersionNum) {
|
||||
this.minDispersionNumServerSet.add(primaryRS);
|
||||
}
|
||||
|
||||
dispersionScoreSummary += dispersionScore;
|
||||
dispersionNumSummary += dispersionNum;
|
||||
}
|
||||
|
||||
// Update the avg dispersion score
|
||||
if (primaryRSToRegionCounterMap.keySet().size() != 0) {
|
||||
this.avgDispersionScore = dispersionScoreSummary /
|
||||
(float) primaryRSToRegionCounterMap.keySet().size();
|
||||
this.avgDispersionNum = dispersionNumSummary /
|
||||
(float) primaryRSToRegionCounterMap.keySet().size();
|
||||
}
|
||||
|
||||
// Fill up the most loaded and least loaded region server information
|
||||
for (Map.Entry<ServerName, Integer> entry :
|
||||
serverToHostingRegionCounterMap.entrySet()) {
|
||||
ServerName currentRS = entry.getKey();
|
||||
int hostRegionCounter = entry.getValue().intValue();
|
||||
|
||||
// Update the most loaded region server list and maxRegionsOnRS
|
||||
if (hostRegionCounter > this.maxRegionsOnRS) {
|
||||
maxRegionsOnRS = hostRegionCounter;
|
||||
this.mostLoadedRSSet.clear();
|
||||
this.mostLoadedRSSet.add(currentRS);
|
||||
} else if (hostRegionCounter == this.maxRegionsOnRS) {
|
||||
this.mostLoadedRSSet.add(currentRS);
|
||||
}
|
||||
|
||||
// Update the least loaded region server list and minRegionsOnRS
|
||||
if (hostRegionCounter < this.minRegionsOnRS) {
|
||||
this.minRegionsOnRS = hostRegionCounter;
|
||||
this.leastLoadedRSSet.clear();
|
||||
this.leastLoadedRSSet.add(currentRS);
|
||||
} else if (hostRegionCounter == this.minRegionsOnRS) {
|
||||
this.leastLoadedRSSet.add(currentRS);
|
||||
}
|
||||
}
|
||||
|
||||
// and total region servers
|
||||
this.totalRegionServers = serverToHostingRegionCounterMap.keySet().size();
|
||||
this.avgRegionsOnRS = (totalRegionServers == 0) ? 0 :
|
||||
(totalRegions / (float) totalRegionServers);
|
||||
// Set the isFilledUp as true
|
||||
isFilledUp = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this to project the dispersion scores
|
||||
* @param tableName
|
||||
* @param snapshot
|
||||
* @param newPlan
|
||||
*/
|
||||
public void fillUpDispersion(TableName tableName,
|
||||
SnapshotOfRegionAssignmentFromMeta snapshot, FavoredNodesPlan newPlan) {
|
||||
// Set the table name
|
||||
this.tableName = tableName;
|
||||
// Get all the regions for this table
|
||||
List<HRegionInfo> regionInfoList = snapshot.getTableToRegionMap().get(
|
||||
tableName);
|
||||
// Get the total region num for the current table
|
||||
this.totalRegions = regionInfoList.size();
|
||||
FavoredNodesPlan plan = null;
|
||||
if (newPlan == null) {
|
||||
plan = snapshot.getExistingAssignmentPlan();
|
||||
} else {
|
||||
plan = newPlan;
|
||||
}
|
||||
// Get the region to region server mapping
|
||||
Map<ServerName, Integer> primaryRSToRegionCounterMap =
|
||||
new HashMap<ServerName, Integer>();
|
||||
Map<ServerName, Set<ServerName>> primaryToSecTerRSMap =
|
||||
new HashMap<ServerName, Set<ServerName>>();
|
||||
|
||||
// Check the favored nodes and its locality information
|
||||
// Also keep tracker of the most loaded and least loaded region servers
|
||||
for (HRegionInfo region : regionInfoList) {
|
||||
try {
|
||||
// Get the favored nodes from the assignment plan and verify it.
|
||||
List<ServerName> favoredNodes = plan.getFavoredNodes(region);
|
||||
if (favoredNodes == null
|
||||
|| favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
|
||||
regionsWithoutValidFavoredNodes.add(region);
|
||||
continue;
|
||||
}
|
||||
// Get the primary, secondary and tertiary region server
|
||||
ServerName primaryRS = favoredNodes
|
||||
.get(FavoredNodesPlan.Position.PRIMARY.ordinal());
|
||||
ServerName secondaryRS = favoredNodes
|
||||
.get(FavoredNodesPlan.Position.SECONDARY.ordinal());
|
||||
ServerName tertiaryRS = favoredNodes
|
||||
.get(FavoredNodesPlan.Position.TERTIARY.ordinal());
|
||||
|
||||
// Update the primary rs to its region set map
|
||||
Integer regionCounter = primaryRSToRegionCounterMap.get(primaryRS);
|
||||
if (regionCounter == null) {
|
||||
regionCounter = Integer.valueOf(0);
|
||||
}
|
||||
regionCounter = regionCounter.intValue() + 1;
|
||||
primaryRSToRegionCounterMap.put(primaryRS, regionCounter);
|
||||
|
||||
// Update the primary rs to secondary and tertiary rs map
|
||||
Set<ServerName> secAndTerSet = primaryToSecTerRSMap.get(primaryRS);
|
||||
if (secAndTerSet == null) {
|
||||
secAndTerSet = new HashSet<ServerName>();
|
||||
}
|
||||
secAndTerSet.add(secondaryRS);
|
||||
secAndTerSet.add(tertiaryRS);
|
||||
primaryToSecTerRSMap.put(primaryRS, secAndTerSet);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot verify the region assignment for region "
|
||||
+ ((region == null) ? " null " : region.getRegionNameAsString())
|
||||
+ "because of " + e);
|
||||
}
|
||||
}
|
||||
float dispersionScoreSummary = 0;
|
||||
float dispersionNumSummary = 0;
|
||||
// Calculate the secondary score for each primary region server
|
||||
for (Map.Entry<ServerName, Integer> entry :
|
||||
primaryRSToRegionCounterMap.entrySet()) {
|
||||
ServerName primaryRS = entry.getKey();
|
||||
Integer regionsOnPrimary = entry.getValue();
|
||||
|
||||
// Process the dispersion number and score
|
||||
float dispersionScore = 0;
|
||||
int dispersionNum = 0;
|
||||
if (primaryToSecTerRSMap.get(primaryRS) != null
|
||||
&& regionsOnPrimary.intValue() != 0) {
|
||||
dispersionNum = primaryToSecTerRSMap.get(primaryRS).size();
|
||||
dispersionScore = dispersionNum /
|
||||
((float) regionsOnPrimary.intValue() * 2);
|
||||
}
|
||||
|
||||
// Update the max dispersion num
|
||||
if (dispersionNum > this.maxDispersionNum) {
|
||||
this.maxDispersionNumServerSet.clear();
|
||||
this.maxDispersionNumServerSet.add(primaryRS);
|
||||
this.maxDispersionNum = dispersionNum;
|
||||
} else if (dispersionNum == this.maxDispersionNum) {
|
||||
this.maxDispersionNumServerSet.add(primaryRS);
|
||||
}
|
||||
|
||||
// Update the min dispersion score
|
||||
if (dispersionScore < this.minDispersionScore) {
|
||||
this.minDispersionScoreServerSet.clear();
|
||||
this.minDispersionScoreServerSet.add(primaryRS);
|
||||
this.minDispersionScore = dispersionScore;
|
||||
} else if (dispersionScore == this.minDispersionScore) {
|
||||
this.minDispersionScoreServerSet.add(primaryRS);
|
||||
}
|
||||
|
||||
// Update the min dispersion num
|
||||
if (dispersionNum < this.minDispersionNum) {
|
||||
this.minDispersionNumServerSet.clear();
|
||||
this.minDispersionNumServerSet.add(primaryRS);
|
||||
this.minDispersionNum = dispersionNum;
|
||||
} else if (dispersionNum == this.minDispersionNum) {
|
||||
this.minDispersionNumServerSet.add(primaryRS);
|
||||
}
|
||||
|
||||
dispersionScoreSummary += dispersionScore;
|
||||
dispersionNumSummary += dispersionNum;
|
||||
}
|
||||
|
||||
// Update the avg dispersion score
|
||||
if (primaryRSToRegionCounterMap.keySet().size() != 0) {
|
||||
this.avgDispersionScore = dispersionScoreSummary /
|
||||
(float) primaryRSToRegionCounterMap.keySet().size();
|
||||
this.avgDispersionNum = dispersionNumSummary /
|
||||
(float) primaryRSToRegionCounterMap.keySet().size();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list which contains just 3 elements: average dispersion score, max
|
||||
* dispersion score and min dispersion score as first, second and third element
|
||||
* respectively.
|
||||
*
|
||||
*/
|
||||
public List<Float> getDispersionInformation() {
|
||||
List<Float> dispersion = new ArrayList<Float>();
|
||||
dispersion.add(avgDispersionScore);
|
||||
dispersion.add(maxDispersionScore);
|
||||
dispersion.add(minDispersionScore);
|
||||
return dispersion;
|
||||
}
|
||||
|
||||
public void print(boolean isDetailMode) {
|
||||
if (!isFilledUp) {
|
||||
System.err.println("[Error] Region assignment verfication report" +
|
||||
"hasn't been filled up");
|
||||
}
|
||||
DecimalFormat df = new java.text.DecimalFormat( "#.##");
|
||||
|
||||
// Print some basic information
|
||||
System.out.println("Region Assignment Verification for Table: " + tableName +
|
||||
"\n\tTotal regions : " + totalRegions);
|
||||
|
||||
// Print the number of regions on each kinds of the favored nodes
|
||||
System.out.println("\tTotal regions on favored nodes " +
|
||||
totalFavoredAssignments);
|
||||
for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
|
||||
System.out.println("\t\tTotal regions on "+ p.toString() +
|
||||
" region servers: " + favoredNodes[p.ordinal()]);
|
||||
}
|
||||
// Print the number of regions in each kinds of invalid assignment
|
||||
System.out.println("\tTotal unassigned regions: " +
|
||||
unAssignedRegionsList.size());
|
||||
if (isDetailMode) {
|
||||
for (HRegionInfo region : unAssignedRegionsList) {
|
||||
System.out.println("\t\t" + region.getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("\tTotal regions NOT on favored nodes: " +
|
||||
nonFavoredAssignedRegionList.size());
|
||||
if (isDetailMode) {
|
||||
for (HRegionInfo region : nonFavoredAssignedRegionList) {
|
||||
System.out.println("\t\t" + region.getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("\tTotal regions without favored nodes: " +
|
||||
regionsWithoutValidFavoredNodes.size());
|
||||
if (isDetailMode) {
|
||||
for (HRegionInfo region : regionsWithoutValidFavoredNodes) {
|
||||
System.out.println("\t\t" + region.getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
|
||||
// Print the locality information if enabled
|
||||
if (this.enforceLocality && totalRegions != 0) {
|
||||
// Print the actual locality for this table
|
||||
float actualLocality = 100 *
|
||||
this.actualLocalitySummary / (float) totalRegions;
|
||||
System.out.println("\n\tThe actual avg locality is " +
|
||||
df.format(actualLocality) + " %");
|
||||
|
||||
// Print the expected locality if regions are placed on the each kinds of
|
||||
// favored nodes
|
||||
for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
|
||||
float avgLocality = 100 *
|
||||
(favoredNodesLocalitySummary[p.ordinal()] / (float) totalRegions);
|
||||
System.out.println("\t\tThe expected avg locality if all regions" +
|
||||
" on the " + p.toString() + " region servers: "
|
||||
+ df.format(avgLocality) + " %");
|
||||
}
|
||||
}
|
||||
|
||||
// Print the region balancing information
|
||||
System.out.println("\n\tTotal hosting region servers: " +
|
||||
totalRegionServers);
|
||||
// Print the region balance information
|
||||
if (totalRegionServers != 0) {
|
||||
System.out.println(
|
||||
"\tAvg dispersion num: " +df.format(avgDispersionNum) +
|
||||
" hosts;\tMax dispersion num: " + df.format(maxDispersionNum) +
|
||||
" hosts;\tMin dispersion num: " + df.format(minDispersionNum) +
|
||||
" hosts;");
|
||||
|
||||
System.out.println("\t\tThe number of the region servers with the max" +
|
||||
" dispersion num: " + this.maxDispersionNumServerSet.size());
|
||||
if (isDetailMode) {
|
||||
printHServerAddressSet(maxDispersionNumServerSet);
|
||||
}
|
||||
|
||||
System.out.println("\t\tThe number of the region servers with the min" +
|
||||
" dispersion num: " + this.minDispersionNumServerSet.size());
|
||||
if (isDetailMode) {
|
||||
printHServerAddressSet(maxDispersionNumServerSet);
|
||||
}
|
||||
|
||||
System.out.println(
|
||||
"\tAvg dispersion score: " + df.format(avgDispersionScore) +
|
||||
";\tMax dispersion score: " + df.format(maxDispersionScore) +
|
||||
";\tMin dispersion score: " + df.format(minDispersionScore) + ";");
|
||||
|
||||
System.out.println("\t\tThe number of the region servers with the max" +
|
||||
" dispersion score: " + this.maxDispersionScoreServerSet.size());
|
||||
if (isDetailMode) {
|
||||
printHServerAddressSet(maxDispersionScoreServerSet);
|
||||
}
|
||||
|
||||
System.out.println("\t\tThe number of the region servers with the min" +
|
||||
" dispersion score: " + this.minDispersionScoreServerSet.size());
|
||||
if (isDetailMode) {
|
||||
printHServerAddressSet(minDispersionScoreServerSet);
|
||||
}
|
||||
|
||||
System.out.println(
|
||||
"\tAvg regions/region server: " + df.format(avgRegionsOnRS) +
|
||||
";\tMax regions/region server: " + maxRegionsOnRS +
|
||||
";\tMin regions/region server: " + minRegionsOnRS + ";");
|
||||
|
||||
// Print the details about the most loaded region servers
|
||||
System.out.println("\t\tThe number of the most loaded region servers: "
|
||||
+ mostLoadedRSSet.size());
|
||||
if (isDetailMode) {
|
||||
printHServerAddressSet(mostLoadedRSSet);
|
||||
}
|
||||
|
||||
// Print the details about the least loaded region servers
|
||||
System.out.println("\t\tThe number of the least loaded region servers: "
|
||||
+ leastLoadedRSSet.size());
|
||||
if (isDetailMode) {
|
||||
printHServerAddressSet(leastLoadedRSSet);
|
||||
}
|
||||
}
|
||||
System.out.println("==============================");
|
||||
}
|
||||
|
||||
private void printHServerAddressSet(Set<ServerName> serverSet) {
|
||||
if (serverSet == null) {
|
||||
return ;
|
||||
}
|
||||
int i = 0;
|
||||
for (ServerName addr : serverSet){
|
||||
if ((i++) % 3 == 0) {
|
||||
System.out.print("\n\t\t\t");
|
||||
}
|
||||
System.out.print(addr.getHostAndPort() + " ; ");
|
||||
}
|
||||
System.out.println("\n");
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,217 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* Used internally for reading meta and constructing datastructures that are
|
||||
* then queried, for things like regions to regionservers, table to regions, etc.
|
||||
* It also records the favored nodes mapping for regions.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SnapshotOfRegionAssignmentFromMeta {
|
||||
private static final Log LOG = LogFactory.getLog(SnapshotOfRegionAssignmentFromMeta.class
|
||||
.getName());
|
||||
|
||||
private CatalogTracker tracker;
|
||||
|
||||
/** the table name to region map */
|
||||
private final Map<TableName, List<HRegionInfo>> tableToRegionMap;
|
||||
/** the region to region server map */
|
||||
//private final Map<HRegionInfo, ServerName> regionToRegionServerMap;
|
||||
private Map<HRegionInfo, ServerName> regionToRegionServerMap;
|
||||
/** the region name to region info map */
|
||||
private final Map<String, HRegionInfo> regionNameToRegionInfoMap;
|
||||
|
||||
/** the regionServer to region map */
|
||||
private final Map<ServerName, List<HRegionInfo>> regionServerToRegionMap;
|
||||
/** the existing assignment plan in the META region */
|
||||
private final FavoredNodesPlan existingAssignmentPlan;
|
||||
private final Set<TableName> disabledTables;
|
||||
private final boolean excludeOfflinedSplitParents;
|
||||
|
||||
public SnapshotOfRegionAssignmentFromMeta(CatalogTracker tracker) {
|
||||
this(tracker, new HashSet<TableName>(), false);
|
||||
}
|
||||
|
||||
public SnapshotOfRegionAssignmentFromMeta(CatalogTracker tracker, Set<TableName> disabledTables,
|
||||
boolean excludeOfflinedSplitParents) {
|
||||
this.tracker = tracker;
|
||||
tableToRegionMap = new HashMap<TableName, List<HRegionInfo>>();
|
||||
regionToRegionServerMap = new HashMap<HRegionInfo, ServerName>();
|
||||
regionServerToRegionMap = new HashMap<ServerName, List<HRegionInfo>>();
|
||||
regionNameToRegionInfoMap = new TreeMap<String, HRegionInfo>();
|
||||
existingAssignmentPlan = new FavoredNodesPlan();
|
||||
this.disabledTables = disabledTables;
|
||||
this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the region assignment snapshot by scanning the META table
|
||||
* @throws IOException
|
||||
*/
|
||||
public void initialize() throws IOException {
|
||||
LOG.info("Start to scan the META for the current region assignment " +
|
||||
"snappshot");
|
||||
// TODO: at some point this code could live in the MetaReader
|
||||
Visitor v = new Visitor() {
|
||||
@Override
|
||||
public boolean visit(Result result) throws IOException {
|
||||
try {
|
||||
if (result == null || result.isEmpty()) return true;
|
||||
Pair<HRegionInfo, ServerName> regionAndServer =
|
||||
HRegionInfo.getHRegionInfoAndServerName(result);
|
||||
HRegionInfo hri = regionAndServer.getFirst();
|
||||
if (hri == null) return true;
|
||||
if (hri.getTableName() == null) return true;
|
||||
if (disabledTables.contains(hri.getTableName())) {
|
||||
return true;
|
||||
}
|
||||
// Are we to include split parents in the list?
|
||||
if (excludeOfflinedSplitParents && hri.isSplit()) return true;
|
||||
// Add the current assignment to the snapshot
|
||||
addAssignment(hri, regionAndServer.getSecond());
|
||||
addRegion(hri);
|
||||
|
||||
// the code below is to handle favored nodes
|
||||
byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY,
|
||||
FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
|
||||
if (favoredNodes == null) return true;
|
||||
// Add the favored nodes into assignment plan
|
||||
ServerName[] favoredServerList =
|
||||
FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
|
||||
// Add the favored nodes into assignment plan
|
||||
existingAssignmentPlan.updateFavoredNodesMap(hri,
|
||||
Arrays.asList(favoredServerList));
|
||||
return true;
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Catche remote exception " + e.getMessage() +
|
||||
" when processing" + result);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
};
|
||||
// Scan .META. to pick up user regions
|
||||
MetaReader.fullScan(tracker, v);
|
||||
//regionToRegionServerMap = regions;
|
||||
LOG.info("Finished to scan the META for the current region assignment" +
|
||||
"snapshot");
|
||||
}
|
||||
|
||||
private void addRegion(HRegionInfo regionInfo) {
|
||||
// Process the region name to region info map
|
||||
regionNameToRegionInfoMap.put(regionInfo.getRegionNameAsString(), regionInfo);
|
||||
|
||||
// Process the table to region map
|
||||
TableName tableName = regionInfo.getTableName();
|
||||
List<HRegionInfo> regionList = tableToRegionMap.get(tableName);
|
||||
if (regionList == null) {
|
||||
regionList = new ArrayList<HRegionInfo>();
|
||||
}
|
||||
// Add the current region info into the tableToRegionMap
|
||||
regionList.add(regionInfo);
|
||||
tableToRegionMap.put(tableName, regionList);
|
||||
}
|
||||
|
||||
private void addAssignment(HRegionInfo regionInfo, ServerName server) {
|
||||
// Process the region to region server map
|
||||
regionToRegionServerMap.put(regionInfo, server);
|
||||
|
||||
// Process the region server to region map
|
||||
List<HRegionInfo> regionList = regionServerToRegionMap.get(server);
|
||||
if (regionList == null) {
|
||||
regionList = new ArrayList<HRegionInfo>();
|
||||
}
|
||||
regionList.add(regionInfo);
|
||||
regionServerToRegionMap.put(server, regionList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the regioninfo for a region
|
||||
* @return the regioninfo
|
||||
*/
|
||||
public Map<String, HRegionInfo> getRegionNameToRegionInfoMap() {
|
||||
return this.regionNameToRegionInfoMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get regions for tables
|
||||
* @return a mapping from table to regions
|
||||
*/
|
||||
public Map<TableName, List<HRegionInfo>> getTableToRegionMap() {
|
||||
return tableToRegionMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get region to region server map
|
||||
* @return region to region server map
|
||||
*/
|
||||
public Map<HRegionInfo, ServerName> getRegionToRegionServerMap() {
|
||||
return regionToRegionServerMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get regionserver to region map
|
||||
* @return regionserver to region map
|
||||
*/
|
||||
public Map<ServerName, List<HRegionInfo>> getRegionServerToRegionMap() {
|
||||
return regionServerToRegionMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the favored nodes plan
|
||||
* @return the existing favored nodes plan
|
||||
*/
|
||||
public FavoredNodesPlan getExistingAssignmentPlan() {
|
||||
return this.existingAssignmentPlan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the table set
|
||||
* @return the table set
|
||||
*/
|
||||
public Set<TableName> getTableSet() {
|
||||
return this.tableToRegionMap.keySet();
|
||||
}
|
||||
}
|
|
@ -21,14 +21,13 @@ package org.apache.hadoop.hbase.master.balancer;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -40,17 +39,14 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.FavoredNodes;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
|
@ -88,55 +84,11 @@ public class FavoredNodeAssignmentHelper {
|
|||
}
|
||||
|
||||
/**
|
||||
* Perform full scan of the meta table similar to
|
||||
* {@link MetaReader#fullScan(CatalogTracker, Set, boolean)} except that this is
|
||||
* aware of the favored nodes
|
||||
* Update meta table with favored nodes info
|
||||
* @param regionToFavoredNodes
|
||||
* @param catalogTracker
|
||||
* @param disabledTables
|
||||
* @param excludeOfflinedSplitParents
|
||||
* @param balancer required because we need to let the balancer know about the
|
||||
* current favored nodes from meta scan
|
||||
* @return Returns a map of every region to it's currently assigned server,
|
||||
* according to META. If the region does not have an assignment it will have
|
||||
* a null value in the map.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Map<HRegionInfo, ServerName> fullScan(
|
||||
CatalogTracker catalogTracker, final Set<TableName> disabledTables,
|
||||
final boolean excludeOfflinedSplitParents,
|
||||
FavoredNodeLoadBalancer balancer) throws IOException {
|
||||
final Map<HRegionInfo, ServerName> regions =
|
||||
new TreeMap<HRegionInfo, ServerName>();
|
||||
final Map<HRegionInfo, ServerName[]> favoredNodesMap =
|
||||
new HashMap<HRegionInfo, ServerName[]>();
|
||||
Visitor v = new Visitor() {
|
||||
@Override
|
||||
public boolean visit(Result r) throws IOException {
|
||||
if (r == null || r.isEmpty()) return true;
|
||||
Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(r);
|
||||
HRegionInfo hri = region.getFirst();
|
||||
if (hri == null) return true;
|
||||
if (hri.getTableName() == null) return true;
|
||||
if (disabledTables.contains(
|
||||
hri.getTableName())) return true;
|
||||
// Are we to include split parents in the list?
|
||||
if (excludeOfflinedSplitParents && hri.isSplitParent()) return true;
|
||||
regions.put(hri, region.getSecond());
|
||||
byte[] favoredNodes = r.getValue(HConstants.CATALOG_FAMILY,
|
||||
FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
|
||||
if (favoredNodes != null) {
|
||||
ServerName[] favoredServerList =
|
||||
FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
|
||||
favoredNodesMap.put(hri, favoredServerList);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
MetaReader.fullScan(catalogTracker, v);
|
||||
balancer.noteFavoredNodes(favoredNodesMap);
|
||||
return regions;
|
||||
}
|
||||
|
||||
public static void updateMetaWithFavoredNodesInfo(
|
||||
Map<HRegionInfo, List<ServerName>> regionToFavoredNodes,
|
||||
CatalogTracker catalogTracker) throws IOException {
|
||||
|
@ -151,6 +103,33 @@ public class FavoredNodeAssignmentHelper {
|
|||
LOG.info("Added " + puts.size() + " regions in META");
|
||||
}
|
||||
|
||||
/**
|
||||
* Update meta table with favored nodes info
|
||||
* @param regionToFavoredNodes
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void updateMetaWithFavoredNodesInfo(
|
||||
Map<HRegionInfo, List<ServerName>> regionToFavoredNodes,
|
||||
Configuration conf) throws IOException {
|
||||
List<Put> puts = new ArrayList<Put>();
|
||||
for (Map.Entry<HRegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
|
||||
Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue());
|
||||
if (put != null) {
|
||||
puts.add(put);
|
||||
}
|
||||
}
|
||||
// Write the region assignments to the meta table.
|
||||
HTable metaTable = null;
|
||||
try {
|
||||
metaTable = new HTable(conf, TableName.META_TABLE_NAME);
|
||||
metaTable.put(puts);
|
||||
} finally {
|
||||
if (metaTable != null) metaTable.close();
|
||||
}
|
||||
LOG.info("Added " + puts.size() + " regions in META");
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates and returns a Put containing the region info for the catalog table
|
||||
* and the servers
|
||||
|
@ -190,10 +169,10 @@ public class FavoredNodeAssignmentHelper {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param serverList
|
||||
* @param serverAddrList
|
||||
* @return PB'ed bytes of {@link FavoredNodes} generated by the server list.
|
||||
*/
|
||||
static byte[] getFavoredNodes(List<ServerName> serverAddrList) {
|
||||
public static byte[] getFavoredNodes(List<ServerName> serverAddrList) {
|
||||
FavoredNodes.Builder f = FavoredNodes.newBuilder();
|
||||
for (ServerName s : serverAddrList) {
|
||||
HBaseProtos.ServerName.Builder b = HBaseProtos.ServerName.newBuilder();
|
||||
|
@ -296,13 +275,164 @@ public class FavoredNodeAssignmentHelper {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Cannot place the favored nodes for region " +
|
||||
regionInfo.getRegionNameAsString() + " because " + e);
|
||||
regionInfo.getRegionNameAsString() + " because " + e, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return secondaryAndTertiaryMap;
|
||||
}
|
||||
|
||||
private Map<ServerName, Set<HRegionInfo>> mapRSToPrimaries(
|
||||
Map<HRegionInfo, ServerName> primaryRSMap) {
|
||||
Map<ServerName, Set<HRegionInfo>> primaryServerMap =
|
||||
new HashMap<ServerName, Set<HRegionInfo>>();
|
||||
for (Entry<HRegionInfo, ServerName> e : primaryRSMap.entrySet()) {
|
||||
Set<HRegionInfo> currentSet = primaryServerMap.get(e.getValue());
|
||||
if (currentSet == null) {
|
||||
currentSet = new HashSet<HRegionInfo>();
|
||||
}
|
||||
currentSet.add(e.getKey());
|
||||
primaryServerMap.put(e.getValue(), currentSet);
|
||||
}
|
||||
return primaryServerMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* For regions that share the primary, avoid placing the secondary and tertiary
|
||||
* on a same RS. Used for generating new assignments for the
|
||||
* primary/secondary/tertiary RegionServers
|
||||
* @param primaryRSMap
|
||||
* @return the map of regions to the servers the region-files should be hosted on
|
||||
* @throws IOException
|
||||
*/
|
||||
public Map<HRegionInfo, ServerName[]> placeSecondaryAndTertiaryWithRestrictions(
|
||||
Map<HRegionInfo, ServerName> primaryRSMap) {
|
||||
Map<ServerName, Set<HRegionInfo>> serverToPrimaries =
|
||||
mapRSToPrimaries(primaryRSMap);
|
||||
Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap =
|
||||
new HashMap<HRegionInfo, ServerName[]>();
|
||||
|
||||
for (Entry<HRegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
|
||||
// Get the target region and its primary region server rack
|
||||
HRegionInfo regionInfo = entry.getKey();
|
||||
ServerName primaryRS = entry.getValue();
|
||||
try {
|
||||
// Get the rack for the primary region server
|
||||
String primaryRack = rackManager.getRack(primaryRS);
|
||||
ServerName[] favoredNodes = null;
|
||||
if (getTotalNumberOfRacks() == 1) {
|
||||
// Single rack case: have to pick the secondary and tertiary
|
||||
// from the same rack
|
||||
favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
|
||||
} else {
|
||||
favoredNodes = multiRackCaseWithRestrictions(serverToPrimaries,
|
||||
secondaryAndTertiaryMap, primaryRack, primaryRS, regionInfo);
|
||||
}
|
||||
if (favoredNodes != null) {
|
||||
secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
|
||||
LOG.debug("Place the secondary and tertiary region server for region "
|
||||
+ regionInfo.getRegionNameAsString());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Cannot place the favored nodes for region "
|
||||
+ regionInfo.getRegionNameAsString() + " because " + e, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return secondaryAndTertiaryMap;
|
||||
}
|
||||
|
||||
private ServerName[] multiRackCaseWithRestrictions(
|
||||
Map<ServerName, Set<HRegionInfo>> serverToPrimaries,
|
||||
Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap,
|
||||
String primaryRack, ServerName primaryRS, HRegionInfo regionInfo) throws IOException {
|
||||
// Random to choose the secondary and tertiary region server
|
||||
// from another rack to place the secondary and tertiary
|
||||
// Random to choose one rack except for the current rack
|
||||
Set<String> rackSkipSet = new HashSet<String>();
|
||||
rackSkipSet.add(primaryRack);
|
||||
String secondaryRack = getOneRandomRack(rackSkipSet);
|
||||
List<ServerName> serverList = getServersFromRack(secondaryRack);
|
||||
Set<ServerName> serverSet = new HashSet<ServerName>();
|
||||
serverSet.addAll(serverList);
|
||||
ServerName[] favoredNodes;
|
||||
if (serverList.size() >= 2) {
|
||||
// Randomly pick up two servers from this secondary rack
|
||||
// Skip the secondary for the tertiary placement
|
||||
// skip the servers which share the primary already
|
||||
Set<HRegionInfo> primaries = serverToPrimaries.get(primaryRS);
|
||||
Set<ServerName> skipServerSet = new HashSet<ServerName>();
|
||||
while (true) {
|
||||
ServerName[] secondaryAndTertiary = null;
|
||||
if (primaries.size() > 1) {
|
||||
// check where his tertiary and secondary are
|
||||
for (HRegionInfo primary : primaries) {
|
||||
secondaryAndTertiary = secondaryAndTertiaryMap.get(primary);
|
||||
if (secondaryAndTertiary != null) {
|
||||
if (regionServerToRackMap.get(secondaryAndTertiary[0]).equals(secondaryRack)) {
|
||||
skipServerSet.add(secondaryAndTertiary[0]);
|
||||
}
|
||||
if (regionServerToRackMap.get(secondaryAndTertiary[1]).equals(secondaryRack)) {
|
||||
skipServerSet.add(secondaryAndTertiary[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (skipServerSet.size() + 2 <= serverSet.size())
|
||||
break;
|
||||
skipServerSet.clear();
|
||||
rackSkipSet.add(secondaryRack);
|
||||
// we used all racks
|
||||
if (rackSkipSet.size() == getTotalNumberOfRacks()) {
|
||||
// remove the last two added and break
|
||||
skipServerSet.remove(secondaryAndTertiary[0]);
|
||||
skipServerSet.remove(secondaryAndTertiary[1]);
|
||||
break;
|
||||
}
|
||||
secondaryRack = getOneRandomRack(rackSkipSet);
|
||||
serverList = getServersFromRack(secondaryRack);
|
||||
serverSet = new HashSet<ServerName>();
|
||||
serverSet.addAll(serverList);
|
||||
}
|
||||
|
||||
// Place the secondary RS
|
||||
ServerName secondaryRS = getOneRandomServer(secondaryRack, skipServerSet);
|
||||
skipServerSet.add(secondaryRS);
|
||||
// Place the tertiary RS
|
||||
ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet);
|
||||
|
||||
if (secondaryRS == null || tertiaryRS == null) {
|
||||
LOG.error("Cannot place the secondary and tertiary"
|
||||
+ " region server for region "
|
||||
+ regionInfo.getRegionNameAsString());
|
||||
}
|
||||
// Create the secondary and tertiary pair
|
||||
favoredNodes = new ServerName[2];
|
||||
favoredNodes[0] = secondaryRS;
|
||||
favoredNodes[1] = tertiaryRS;
|
||||
} else {
|
||||
// Pick the secondary rs from this secondary rack
|
||||
// and pick the tertiary from another random rack
|
||||
favoredNodes = new ServerName[2];
|
||||
ServerName secondary = getOneRandomServer(secondaryRack);
|
||||
favoredNodes[0] = secondary;
|
||||
|
||||
// Pick the tertiary
|
||||
if (getTotalNumberOfRacks() == 2) {
|
||||
// Pick the tertiary from the same rack of the primary RS
|
||||
Set<ServerName> serverSkipSet = new HashSet<ServerName>();
|
||||
serverSkipSet.add(primaryRS);
|
||||
favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet);
|
||||
} else {
|
||||
// Pick the tertiary from another rack
|
||||
rackSkipSet.add(secondaryRack);
|
||||
String tertiaryRandomRack = getOneRandomRack(rackSkipSet);
|
||||
favoredNodes[1] = getOneRandomServer(tertiaryRandomRack);
|
||||
}
|
||||
}
|
||||
return favoredNodes;
|
||||
}
|
||||
|
||||
private ServerName[] singleRackCase(HRegionInfo regionInfo,
|
||||
ServerName primaryRS,
|
||||
String primaryRack) throws IOException {
|
||||
|
@ -311,7 +441,7 @@ public class FavoredNodeAssignmentHelper {
|
|||
List<ServerName> serverList = getServersFromRack(primaryRack);
|
||||
if (serverList.size() <= 2) {
|
||||
// Single region server case: cannot not place the favored nodes
|
||||
// on any server; !domain.canPlaceFavoredNodes()
|
||||
// on any server;
|
||||
return null;
|
||||
} else {
|
||||
// Randomly select two region servers from the server list and make sure
|
||||
|
@ -400,7 +530,7 @@ public class FavoredNodeAssignmentHelper {
|
|||
return (serverSize >= FAVORED_NODES_NUM);
|
||||
}
|
||||
|
||||
void initialize() {
|
||||
public void initialize() {
|
||||
for (ServerName sn : this.servers) {
|
||||
String rackName = this.rackManager.getRack(sn);
|
||||
List<ServerName> serverList = this.rackToRegionServerMap.get(rackName);
|
||||
|
@ -462,4 +592,14 @@ public class FavoredNodeAssignmentHelper {
|
|||
|
||||
return randomRack;
|
||||
}
|
||||
|
||||
public static String getFavoredNodesAsString(List<ServerName> nodes) {
|
||||
StringBuffer strBuf = new StringBuffer();
|
||||
int i = 0;
|
||||
for (ServerName node : nodes) {
|
||||
strBuf.append(node.getHostAndPort());
|
||||
if (++i != nodes.size()) strBuf.append(";");
|
||||
}
|
||||
return strBuf.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.balancer;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -30,12 +29,15 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodes.Position;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan.Position;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
|
@ -55,23 +57,91 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
|
||||
private static final Log LOG = LogFactory.getLog(FavoredNodeLoadBalancer.class);
|
||||
|
||||
private FavoredNodes globalFavoredNodesAssignmentPlan;
|
||||
private FavoredNodesPlan globalFavoredNodesAssignmentPlan;
|
||||
private RackManager rackManager;
|
||||
Configuration conf;
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
globalFavoredNodesAssignmentPlan = new FavoredNodes();
|
||||
globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
|
||||
this.rackManager = new RackManager(conf);
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
|
||||
//TODO. At a high level, this should look at the block locality per region, and
|
||||
//then reassign regions based on which nodes have the most blocks of the region
|
||||
//file(s). There could be different ways like minimize region movement, or, maximum
|
||||
//locality, etc. The other dimension to look at is whether Stochastic loadbalancer
|
||||
//can be integrated with this
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
|
||||
//TODO. Look at is whether Stochastic loadbalancer can be integrated with this
|
||||
List<RegionPlan> plans = new ArrayList<RegionPlan>();
|
||||
//perform a scan of the meta to get the latest updates (if any)
|
||||
SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
|
||||
new SnapshotOfRegionAssignmentFromMeta(super.services.getCatalogTracker());
|
||||
try {
|
||||
snaphotOfRegionAssignment.initialize();
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Not running balancer since exception was thrown " + ie);
|
||||
return plans;
|
||||
}
|
||||
globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
|
||||
Map<ServerName, ServerName> serverNameToServerNameWithoutCode =
|
||||
new HashMap<ServerName, ServerName>();
|
||||
Map<ServerName, ServerName> serverNameWithoutCodeToServerName =
|
||||
new HashMap<ServerName, ServerName>();
|
||||
ServerManager serverMgr = super.services.getServerManager();
|
||||
for (ServerName sn: serverMgr.getOnlineServersList()) {
|
||||
ServerName s = new ServerName(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
|
||||
serverNameToServerNameWithoutCode.put(sn, s);
|
||||
serverNameWithoutCodeToServerName.put(s, sn);
|
||||
}
|
||||
for (Map.Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
|
||||
ServerName currentServer = entry.getKey();
|
||||
//get a server without the startcode for the currentServer
|
||||
ServerName currentServerWithoutStartCode = new ServerName(currentServer.getHostname(),
|
||||
currentServer.getPort(), ServerName.NON_STARTCODE);
|
||||
List<HRegionInfo> list = entry.getValue();
|
||||
for (HRegionInfo region : list) {
|
||||
if(region.getTableName().getNamespaceAsString()
|
||||
.equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
|
||||
continue;
|
||||
}
|
||||
List<ServerName> favoredNodes = globalFavoredNodesAssignmentPlan.getFavoredNodes(region);
|
||||
if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
|
||||
continue; //either favorednodes does not exist or we are already on the primary node
|
||||
}
|
||||
ServerName destination = null;
|
||||
//check whether the primary is available
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
|
||||
if (destination == null) {
|
||||
//check whether the region is on secondary/tertiary
|
||||
if (currentServerWithoutStartCode.equals(favoredNodes.get(1)) ||
|
||||
currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
|
||||
continue;
|
||||
}
|
||||
//the region is currently on none of the favored nodes
|
||||
//get it on one of them if possible
|
||||
ServerLoad l1 = super.services.getServerManager().getLoad(
|
||||
serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
|
||||
ServerLoad l2 = super.services.getServerManager().getLoad(
|
||||
serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
|
||||
if (l1 != null && l2 != null) {
|
||||
if (l1.getLoad() > l2.getLoad()) {
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
|
||||
} else {
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
|
||||
}
|
||||
} else if (l1 != null) {
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
|
||||
} else if (l2 != null) {
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
|
||||
}
|
||||
}
|
||||
|
||||
if (destination != null) {
|
||||
RegionPlan plan = new RegionPlan(region, currentServer, destination);
|
||||
plans.add(plan);
|
||||
}
|
||||
}
|
||||
}
|
||||
return plans;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -168,8 +238,8 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
|
|||
for (ServerName s : favoredNodes) {
|
||||
ServerName serverWithLegitStartCode = availableServersContains(availableServers, s);
|
||||
if (serverWithLegitStartCode != null) {
|
||||
FavoredNodes.Position position =
|
||||
FavoredNodes.getFavoredServerPosition(favoredNodes, s);
|
||||
FavoredNodesPlan.Position position =
|
||||
FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
|
||||
if (Position.PRIMARY.equals(position)) {
|
||||
primaryHost = serverWithLegitStartCode;
|
||||
} else if (Position.SECONDARY.equals(position)) {
|
||||
|
@ -243,7 +313,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
|
||||
Map<ServerName, List<HRegionInfo>> assignmentMap,
|
||||
List<HRegionInfo> regions, List<ServerName> servers) throws IOException {
|
||||
List<HRegionInfo> regions, List<ServerName> servers) {
|
||||
Map<HRegionInfo, ServerName> primaryRSMap = new HashMap<HRegionInfo, ServerName>();
|
||||
// figure the primary RSs
|
||||
assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
|
||||
|
@ -274,12 +344,4 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
|
|||
globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(region, favoredNodesForRegion);
|
||||
}
|
||||
}
|
||||
|
||||
void noteFavoredNodes(final Map<HRegionInfo, ServerName[]> favoredNodesMap) {
|
||||
for (Map.Entry<HRegionInfo, ServerName[]> entry : favoredNodesMap.entrySet()) {
|
||||
// the META should already have favorednode ServerName objects without startcode
|
||||
globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(entry.getKey(),
|
||||
Arrays.asList(entry.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.jboss.netty.util.internal.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* This class contains the mapping information between each region and
|
||||
* its favored region server list. Used by {@link FavoredNodeLoadBalancer} set
|
||||
* of classes and from unit tests (hence the class is public)
|
||||
*
|
||||
* All the access to this class is thread-safe.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FavoredNodesPlan {
|
||||
protected static final Log LOG = LogFactory.getLog(
|
||||
FavoredNodesPlan.class.getName());
|
||||
|
||||
/** the map between each region and its favored region server list */
|
||||
private Map<HRegionInfo, List<ServerName>> favoredNodesMap;
|
||||
|
||||
public static enum Position {
|
||||
PRIMARY,
|
||||
SECONDARY,
|
||||
TERTIARY;
|
||||
};
|
||||
|
||||
public FavoredNodesPlan() {
|
||||
favoredNodesMap = new ConcurrentHashMap<HRegionInfo, List<ServerName>>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an assignment to the plan
|
||||
* @param region
|
||||
* @param servers
|
||||
*/
|
||||
public synchronized void updateFavoredNodesMap(HRegionInfo region,
|
||||
List<ServerName> servers) {
|
||||
if (region == null || servers == null || servers.size() ==0)
|
||||
return;
|
||||
this.favoredNodesMap.put(region, servers);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param region
|
||||
* @return the list of favored region server for this region based on the plan
|
||||
*/
|
||||
public synchronized List<ServerName> getFavoredNodes(HRegionInfo region) {
|
||||
return favoredNodesMap.get(region);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the position of the server in the favoredNodes list. Assumes the
|
||||
* favoredNodes list is of size 3.
|
||||
* @param favoredNodes
|
||||
* @param server
|
||||
* @return position
|
||||
*/
|
||||
public static Position getFavoredServerPosition(
|
||||
List<ServerName> favoredNodes, ServerName server) {
|
||||
if (favoredNodes == null || server == null ||
|
||||
favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
|
||||
return null;
|
||||
}
|
||||
for (Position p : Position.values()) {
|
||||
if (favoredNodes.get(p.ordinal()).equals(server)) {
|
||||
return p;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the mapping between each region to its favored region server list
|
||||
*/
|
||||
public synchronized Map<HRegionInfo, List<ServerName>> getAssignmentMap() {
|
||||
return this.favoredNodesMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an assignment to the plan
|
||||
* @param region
|
||||
* @param servers
|
||||
*/
|
||||
public synchronized void updateAssignmentPlan(HRegionInfo region,
|
||||
List<ServerName> servers) {
|
||||
if (region == null || servers == null || servers.size() ==0)
|
||||
return;
|
||||
this.favoredNodesMap.put(region, servers);
|
||||
LOG.info("Update the assignment plan for region " +
|
||||
region.getRegionNameAsString() + " ; favored nodes " +
|
||||
FavoredNodeAssignmentHelper.getFavoredNodesAsString(servers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
// To compare the map from objec o is identical to current assignment map.
|
||||
Map<HRegionInfo, List<ServerName>> comparedMap=
|
||||
((FavoredNodesPlan)o).getAssignmentMap();
|
||||
|
||||
// compare the size
|
||||
if (comparedMap.size() != this.favoredNodesMap.size())
|
||||
return false;
|
||||
|
||||
// compare each element in the assignment map
|
||||
for (Map.Entry<HRegionInfo, List<ServerName>> entry :
|
||||
comparedMap.entrySet()) {
|
||||
List<ServerName> serverList = this.favoredNodesMap.get(entry.getKey());
|
||||
if (serverList == null && entry.getValue() != null) {
|
||||
return false;
|
||||
} else if (serverList != null && !serverList.equals(entry.getValue())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return favoredNodesMap.hashCode();
|
||||
}
|
||||
}
|
|
@ -145,6 +145,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||
|
@ -4341,4 +4343,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
|
||||
UpdateFavoredNodesRequest request) throws ServiceException {
|
||||
List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
|
||||
UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
|
||||
for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
|
||||
HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
|
||||
updateRegionFavoredNodesMapping(hri.getEncodedName(),
|
||||
regionUpdateInfo.getFavoredNodesList());
|
||||
}
|
||||
respBuilder.setResponse(openInfoList.size());
|
||||
return respBuilder.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* Thread that walks over the filesystem, and computes the mappings
|
||||
* <Region -> BestHost> and <Region -> Map<HostName, fractional-locality-of-region>>
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class FSRegionScanner implements Runnable {
|
||||
static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
|
||||
|
||||
private Path regionPath;
|
||||
|
||||
/**
|
||||
* The file system used
|
||||
*/
|
||||
private FileSystem fs;
|
||||
|
||||
/**
|
||||
* Maps each region to the RS with highest locality for that region.
|
||||
*/
|
||||
private Map<String,String> regionToBestLocalityRSMapping;
|
||||
|
||||
/**
|
||||
* Maps region encoded names to maps of hostnames to fractional locality of
|
||||
* that region on that host.
|
||||
*/
|
||||
private Map<String, Map<String, Float>> regionDegreeLocalityMapping;
|
||||
|
||||
FSRegionScanner(FileSystem fs, Path regionPath,
|
||||
Map<String, String> regionToBestLocalityRSMapping,
|
||||
Map<String, Map<String, Float>> regionDegreeLocalityMapping) {
|
||||
this.fs = fs;
|
||||
this.regionPath = regionPath;
|
||||
this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping;
|
||||
this.regionDegreeLocalityMapping = regionDegreeLocalityMapping;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// empty the map for each region
|
||||
Map<String, AtomicInteger> blockCountMap = new HashMap<String, AtomicInteger>();
|
||||
|
||||
//get table name
|
||||
String tableName = regionPath.getParent().getName();
|
||||
int totalBlkCount = 0;
|
||||
|
||||
// ignore null
|
||||
FileStatus[] cfList = fs.listStatus(regionPath);
|
||||
if (null == cfList) {
|
||||
return;
|
||||
}
|
||||
|
||||
// for each cf, get all the blocks information
|
||||
for (FileStatus cfStatus : cfList) {
|
||||
if (!cfStatus.isDir()) {
|
||||
// skip because this is not a CF directory
|
||||
continue;
|
||||
}
|
||||
if (cfStatus.getPath().getName().startsWith(".")) {
|
||||
continue;
|
||||
}
|
||||
FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
|
||||
if (null == storeFileLists) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (FileStatus storeFile : storeFileLists) {
|
||||
BlockLocation[] blkLocations =
|
||||
fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
|
||||
if (null == blkLocations) {
|
||||
continue;
|
||||
}
|
||||
|
||||
totalBlkCount += blkLocations.length;
|
||||
for(BlockLocation blk: blkLocations) {
|
||||
for (String host: blk.getHosts()) {
|
||||
AtomicInteger count = blockCountMap.get(host);
|
||||
if (count == null) {
|
||||
count = new AtomicInteger(0);
|
||||
blockCountMap.put(host, count);
|
||||
}
|
||||
count.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (regionToBestLocalityRSMapping != null) {
|
||||
int largestBlkCount = 0;
|
||||
String hostToRun = null;
|
||||
for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
|
||||
String host = entry.getKey();
|
||||
|
||||
int tmp = entry.getValue().get();
|
||||
if (tmp > largestBlkCount) {
|
||||
largestBlkCount = tmp;
|
||||
hostToRun = host;
|
||||
}
|
||||
}
|
||||
|
||||
// empty regions could make this null
|
||||
if (null == hostToRun) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (hostToRun.endsWith(".")) {
|
||||
hostToRun = hostToRun.substring(0, hostToRun.length()-1);
|
||||
}
|
||||
String name = tableName + ":" + regionPath.getName();
|
||||
synchronized (regionToBestLocalityRSMapping) {
|
||||
regionToBestLocalityRSMapping.put(name, hostToRun);
|
||||
}
|
||||
}
|
||||
|
||||
if (regionDegreeLocalityMapping != null && totalBlkCount > 0) {
|
||||
Map<String, Float> hostLocalityMap = new HashMap<String, Float>();
|
||||
for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
|
||||
String host = entry.getKey();
|
||||
if (host.endsWith(".")) {
|
||||
host = host.substring(0, host.length() - 1);
|
||||
}
|
||||
// Locality is fraction of blocks local to this host.
|
||||
float locality = ((float)entry.getValue().get()) / totalBlkCount;
|
||||
hostLocalityMap.put(host, locality);
|
||||
}
|
||||
// Put the locality map into the result map, keyed by the encoded name
|
||||
// of the region.
|
||||
regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Problem scanning file system", e);
|
||||
} catch (RuntimeException e) {
|
||||
LOG.warn("Problem scanning file system", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,6 +35,10 @@ import java.util.HashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -61,6 +65,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionPlacementMaintainer;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
@ -87,6 +92,8 @@ public abstract class FSUtils {
|
|||
|
||||
/** Full access permissions (starting point for a umask) */
|
||||
private static final String FULL_RWX_PERMISSIONS = "777";
|
||||
private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
|
||||
private static final int DEFAULT_THREAD_POOLSIZE = 2;
|
||||
|
||||
/** Set to true on Windows platforms */
|
||||
public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
|
||||
|
@ -1698,4 +1705,181 @@ public abstract class FSUtils {
|
|||
fs.setTimes(src, EnvironmentEdgeManager.currentTimeMillis(), -1);
|
||||
return fs.rename(src, dest);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is to scan the root path of the file system to get the
|
||||
* degree of locality for each region on each of the servers having at least
|
||||
* one block of that region.
|
||||
* This is used by the tool {@link RegionPlacementMaintainer}
|
||||
*
|
||||
* @param conf
|
||||
* the configuration to use
|
||||
* @return the mapping from region encoded name to a map of server names to
|
||||
* locality fraction
|
||||
* @throws IOException
|
||||
* in case of file system errors or interrupts
|
||||
*/
|
||||
public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
|
||||
final Configuration conf) throws IOException {
|
||||
return getRegionDegreeLocalityMappingFromFS(
|
||||
conf, null,
|
||||
conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is to scan the root path of the file system to get the
|
||||
* degree of locality for each region on each of the servers having at least
|
||||
* one block of that region.
|
||||
*
|
||||
* @param conf
|
||||
* the configuration to use
|
||||
* @param desiredTable
|
||||
* the table you wish to scan locality for
|
||||
* @param threadPoolSize
|
||||
* the thread pool size to use
|
||||
* @return the mapping from region encoded name to a map of server names to
|
||||
* locality fraction
|
||||
* @throws IOException
|
||||
* in case of file system errors or interrupts
|
||||
*/
|
||||
public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
|
||||
final Configuration conf, final String desiredTable, int threadPoolSize)
|
||||
throws IOException {
|
||||
Map<String, Map<String, Float>> regionDegreeLocalityMapping =
|
||||
new ConcurrentHashMap<String, Map<String, Float>>();
|
||||
getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
|
||||
regionDegreeLocalityMapping);
|
||||
return regionDegreeLocalityMapping;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is to scan the root path of the file system to get either the
|
||||
* mapping between the region name and its best locality region server or the
|
||||
* degree of locality of each region on each of the servers having at least
|
||||
* one block of that region. The output map parameters are both optional.
|
||||
*
|
||||
* @param conf
|
||||
* the configuration to use
|
||||
* @param desiredTable
|
||||
* the table you wish to scan locality for
|
||||
* @param threadPoolSize
|
||||
* the thread pool size to use
|
||||
* @param regionToBestLocalityRSMapping
|
||||
* the map into which to put the best locality mapping or null
|
||||
* @param regionDegreeLocalityMapping
|
||||
* the map into which to put the locality degree mapping or null,
|
||||
* must be a thread-safe implementation
|
||||
* @throws IOException
|
||||
* in case of file system errors or interrupts
|
||||
*/
|
||||
private static void getRegionLocalityMappingFromFS(
|
||||
final Configuration conf, final String desiredTable,
|
||||
int threadPoolSize,
|
||||
Map<String, String> regionToBestLocalityRSMapping,
|
||||
Map<String, Map<String, Float>> regionDegreeLocalityMapping)
|
||||
throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path rootPath = FSUtils.getRootDir(conf);
|
||||
long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
Path queryPath;
|
||||
// The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
|
||||
if (null == desiredTable) {
|
||||
queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR), "/*/*/");
|
||||
} else {
|
||||
queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)), "/*/");
|
||||
}
|
||||
|
||||
// reject all paths that are not appropriate
|
||||
PathFilter pathFilter = new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path path) {
|
||||
// this is the region name; it may get some noise data
|
||||
if (null == path) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// no parent?
|
||||
Path parent = path.getParent();
|
||||
if (null == parent) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// not part of a table?
|
||||
if (!parent.getName().equals(TableName.META_TABLE_NAME.getQualifierAsString())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String regionName = path.getName();
|
||||
if (null == regionName) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
|
||||
|
||||
if (null == statusList) {
|
||||
return;
|
||||
} else {
|
||||
LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
|
||||
statusList.length);
|
||||
}
|
||||
|
||||
// lower the number of threads in case we have very few expected regions
|
||||
threadPoolSize = Math.min(threadPoolSize, statusList.length);
|
||||
|
||||
// run in multiple threads
|
||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
|
||||
threadPoolSize, 60, TimeUnit.SECONDS,
|
||||
new ArrayBlockingQueue<Runnable>(statusList.length));
|
||||
try {
|
||||
// ignore all file status items that are not of interest
|
||||
for (FileStatus regionStatus : statusList) {
|
||||
if (null == regionStatus) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!regionStatus.isDir()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Path regionPath = regionStatus.getPath();
|
||||
if (null == regionPath) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tpe.execute(new FSRegionScanner(fs, regionPath,
|
||||
regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
|
||||
}
|
||||
} finally {
|
||||
tpe.shutdown();
|
||||
int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
|
||||
60 * 1000);
|
||||
try {
|
||||
// here we wait until TPE terminates, which is either naturally or by
|
||||
// exceptions in the execution of the threads
|
||||
while (!tpe.awaitTermination(threadWakeFrequency,
|
||||
TimeUnit.MILLISECONDS)) {
|
||||
// printing out rough estimate, so as to not introduce
|
||||
// AtomicInteger
|
||||
LOG.info("Locality checking is underway: { Scanned Regions : "
|
||||
+ tpe.getCompletedTaskCount() + "/"
|
||||
+ tpe.getTaskCount() + " }");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
long overhead = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
|
||||
|
||||
LOG.info(overheadMsg);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,516 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Deque;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Computes the optimal (minimal cost) assignment of jobs to workers (or other
|
||||
* analogous) concepts given a cost matrix of each pair of job and worker, using
|
||||
* the algorithm by James Munkres in "Algorithms for the Assignment and
|
||||
* Transportation Problems", with additional optimizations as described by Jin
|
||||
* Kue Wong in "A New Implementation of an Algorithm for the Optimal Assignment
|
||||
* Problem: An Improved Version of Munkres' Algorithm". The algorithm runs in
|
||||
* O(n^3) time and need O(n^2) auxiliary space where n is the number of jobs or
|
||||
* workers, whichever is greater.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MunkresAssignment {
|
||||
|
||||
// The original algorithm by Munkres uses the terms STAR and PRIME to denote
|
||||
// different states of zero values in the cost matrix. These values are
|
||||
// represented as byte constants instead of enums to save space in the mask
|
||||
// matrix by a factor of 4n^2 where n is the size of the problem.
|
||||
private static final byte NONE = 0;
|
||||
private static final byte STAR = 1;
|
||||
private static final byte PRIME = 2;
|
||||
|
||||
// The algorithm requires that the number of column is at least as great as
|
||||
// the number of rows. If that is not the case, then the cost matrix should
|
||||
// be transposed before computation, and the solution matrix transposed before
|
||||
// returning to the caller.
|
||||
private final boolean transposed;
|
||||
|
||||
// The number of rows of internal matrices.
|
||||
private final int rows;
|
||||
|
||||
// The number of columns of internal matrices.
|
||||
private final int cols;
|
||||
|
||||
// The cost matrix, the cost of assigning each row index to column index.
|
||||
private float[][] cost;
|
||||
|
||||
// Mask of zero cost assignment states.
|
||||
private byte[][] mask;
|
||||
|
||||
// Covering some rows of the cost matrix.
|
||||
private boolean[] rowsCovered;
|
||||
|
||||
// Covering some columns of the cost matrix.
|
||||
private boolean[] colsCovered;
|
||||
|
||||
// The alternating path between starred zeroes and primed zeroes
|
||||
private Deque<Pair<Integer, Integer>> path;
|
||||
|
||||
// The solution, marking which rows should be assigned to which columns. The
|
||||
// positions of elements in this array correspond to the rows of the cost
|
||||
// matrix, and the value of each element correspond to the columns of the cost
|
||||
// matrix, i.e. assignments[i] = j indicates that row i should be assigned to
|
||||
// column j.
|
||||
private int[] assignments;
|
||||
|
||||
// Improvements described by Jin Kue Wong cache the least value in each row,
|
||||
// as well as the column index of the least value in each row, and the pending
|
||||
// adjustments to each row and each column.
|
||||
private float[] leastInRow;
|
||||
private int[] leastInRowIndex;
|
||||
private float[] rowAdjust;
|
||||
private float[] colAdjust;
|
||||
|
||||
/**
|
||||
* Construct a new problem instance with the specified cost matrix. The cost
|
||||
* matrix must be rectangular, though not necessarily square. If one dimension
|
||||
* is greater than the other, some elements in the greater dimension will not
|
||||
* be assigned. The input cost matrix will not be modified.
|
||||
* @param costMatrix
|
||||
*/
|
||||
public MunkresAssignment(float[][] costMatrix) {
|
||||
// The algorithm assumes that the number of columns is at least as great as
|
||||
// the number of rows. If this is not the case of the input matrix, then
|
||||
// all internal structures must be transposed relative to the input.
|
||||
this.transposed = costMatrix.length > costMatrix[0].length;
|
||||
if (this.transposed) {
|
||||
this.rows = costMatrix[0].length;
|
||||
this.cols = costMatrix.length;
|
||||
} else {
|
||||
this.rows = costMatrix.length;
|
||||
this.cols = costMatrix[0].length;
|
||||
}
|
||||
|
||||
cost = new float[rows][cols];
|
||||
mask = new byte[rows][cols];
|
||||
rowsCovered = new boolean[rows];
|
||||
colsCovered = new boolean[cols];
|
||||
path = new LinkedList<Pair<Integer, Integer>>();
|
||||
|
||||
leastInRow = new float[rows];
|
||||
leastInRowIndex = new int[rows];
|
||||
rowAdjust = new float[rows];
|
||||
colAdjust = new float[cols];
|
||||
|
||||
assignments = null;
|
||||
|
||||
// Copy cost matrix.
|
||||
if (transposed) {
|
||||
for (int r = 0; r < rows; r++) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
cost[r][c] = costMatrix[c][r];
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int r = 0; r < rows; r++) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
cost[r][c] = costMatrix[r][c];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Costs must be finite otherwise the matrix can get into a bad state where
|
||||
// no progress can be made. If your use case depends on a distinction
|
||||
// between costs of MAX_VALUE and POSITIVE_INFINITY, you're doing it wrong.
|
||||
for (int r = 0; r < rows; r++) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (cost[r][c] == Float.POSITIVE_INFINITY) {
|
||||
cost[r][c] = Float.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the optimal assignments. The returned array will have the same number
|
||||
* of elements as the number of elements as the number of rows in the input
|
||||
* cost matrix. Each element will indicate which column should be assigned to
|
||||
* that row or -1 if no column should be assigned, i.e. if result[i] = j then
|
||||
* row i should be assigned to column j. Subsequent invocations of this method
|
||||
* will simply return the same object without additional computation.
|
||||
* @return an array with the optimal assignments
|
||||
*/
|
||||
public int[] solve() {
|
||||
// If this assignment problem has already been solved, return the known
|
||||
// solution
|
||||
if (assignments != null) {
|
||||
return assignments;
|
||||
}
|
||||
|
||||
preliminaries();
|
||||
|
||||
// Find the optimal assignments.
|
||||
while (!testIsDone()) {
|
||||
while (!stepOne()) {
|
||||
stepThree();
|
||||
}
|
||||
stepTwo();
|
||||
}
|
||||
|
||||
// Extract the assignments from the mask matrix.
|
||||
if (transposed) {
|
||||
assignments = new int[cols];
|
||||
outer:
|
||||
for (int c = 0; c < cols; c++) {
|
||||
for (int r = 0; r < rows; r++) {
|
||||
if (mask[r][c] == STAR) {
|
||||
assignments[c] = r;
|
||||
continue outer;
|
||||
}
|
||||
}
|
||||
// There is no assignment for this row of the input/output.
|
||||
assignments[c] = -1;
|
||||
}
|
||||
} else {
|
||||
assignments = new int[rows];
|
||||
outer:
|
||||
for (int r = 0; r < rows; r++) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (mask[r][c] == STAR) {
|
||||
assignments[r] = c;
|
||||
continue outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Once the solution has been computed, there is no need to keep any of the
|
||||
// other internal structures. Clear all unnecessary internal references so
|
||||
// the garbage collector may reclaim that memory.
|
||||
cost = null;
|
||||
mask = null;
|
||||
rowsCovered = null;
|
||||
colsCovered = null;
|
||||
path = null;
|
||||
leastInRow = null;
|
||||
leastInRowIndex = null;
|
||||
rowAdjust = null;
|
||||
colAdjust = null;
|
||||
|
||||
return assignments;
|
||||
}
|
||||
|
||||
/**
|
||||
* Corresponds to the "preliminaries" step of the original algorithm.
|
||||
* Guarantees that the matrix is an equivalent non-negative matrix with at
|
||||
* least one zero in each row.
|
||||
*/
|
||||
private void preliminaries() {
|
||||
for (int r = 0; r < rows; r++) {
|
||||
// Find the minimum cost of each row.
|
||||
float min = Float.POSITIVE_INFINITY;
|
||||
for (int c = 0; c < cols; c++) {
|
||||
min = Math.min(min, cost[r][c]);
|
||||
}
|
||||
|
||||
// Subtract that minimum cost from each element in the row.
|
||||
for (int c = 0; c < cols; c++) {
|
||||
cost[r][c] -= min;
|
||||
|
||||
// If the element is now zero and there are no zeroes in the same row
|
||||
// or column which are already starred, then star this one. There
|
||||
// must be at least one zero because of subtracting the min cost.
|
||||
if (cost[r][c] == 0 && !rowsCovered[r] && !colsCovered[c]) {
|
||||
mask[r][c] = STAR;
|
||||
// Cover this row and column so that no other zeroes in them can be
|
||||
// starred.
|
||||
rowsCovered[r] = true;
|
||||
colsCovered[c] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the covered rows and columns.
|
||||
Arrays.fill(rowsCovered, false);
|
||||
Arrays.fill(colsCovered, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether the algorithm is done, i.e. we have the optimal assignment.
|
||||
* This occurs when there is exactly one starred zero in each row.
|
||||
* @return true if the algorithm is done
|
||||
*/
|
||||
private boolean testIsDone() {
|
||||
// Cover all columns containing a starred zero. There can be at most one
|
||||
// starred zero per column. Therefore, a covered column has an optimal
|
||||
// assignment.
|
||||
for (int r = 0; r < rows; r++) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (mask[r][c] == STAR) {
|
||||
colsCovered[c] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Count the total number of covered columns.
|
||||
int coveredCols = 0;
|
||||
for (int c = 0; c < cols; c++) {
|
||||
coveredCols += colsCovered[c] ? 1 : 0;
|
||||
}
|
||||
|
||||
// Apply an row and column adjustments that are pending.
|
||||
for (int r = 0; r < rows; r++) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
cost[r][c] += rowAdjust[r];
|
||||
cost[r][c] += colAdjust[c];
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the pending row and column adjustments.
|
||||
Arrays.fill(rowAdjust, 0);
|
||||
Arrays.fill(colAdjust, 0);
|
||||
|
||||
// The covers on columns and rows may have been reset, recompute the least
|
||||
// value for each row.
|
||||
for (int r = 0; r < rows; r++) {
|
||||
leastInRow[r] = Float.POSITIVE_INFINITY;
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (!rowsCovered[r] && !colsCovered[c] && cost[r][c] < leastInRow[r]) {
|
||||
leastInRow[r] = cost[r][c];
|
||||
leastInRowIndex[r] = c;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If all columns are covered, then we are done. Since there may be more
|
||||
// columns than rows, we are also done if the number of covered columns is
|
||||
// at least as great as the number of rows.
|
||||
return (coveredCols == cols || coveredCols >= rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* Corresponds to step 1 of the original algorithm.
|
||||
* @return false if all zeroes are covered
|
||||
*/
|
||||
private boolean stepOne() {
|
||||
while (true) {
|
||||
Pair<Integer, Integer> zero = findUncoveredZero();
|
||||
if (zero == null) {
|
||||
// No uncovered zeroes, need to manipulate the cost matrix in step
|
||||
// three.
|
||||
return false;
|
||||
} else {
|
||||
// Prime the uncovered zero and find a starred zero in the same row.
|
||||
mask[zero.getFirst()][zero.getSecond()] = PRIME;
|
||||
Pair<Integer, Integer> star = starInRow(zero.getFirst());
|
||||
if (star != null) {
|
||||
// Cover the row with both the newly primed zero and the starred zero.
|
||||
// Since this is the only place where zeroes are primed, and we cover
|
||||
// it here, and rows are only uncovered when primes are erased, then
|
||||
// there can be at most one primed uncovered zero.
|
||||
rowsCovered[star.getFirst()] = true;
|
||||
colsCovered[star.getSecond()] = false;
|
||||
updateMin(star.getFirst(), star.getSecond());
|
||||
} else {
|
||||
// Will go to step two after, where a path will be constructed,
|
||||
// starting from the uncovered primed zero (there is only one). Since
|
||||
// we have already found it, save it as the first node in the path.
|
||||
path.clear();
|
||||
path.offerLast(new Pair<Integer, Integer>(zero.getFirst(),
|
||||
zero.getSecond()));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Corresponds to step 2 of the original algorithm.
|
||||
*/
|
||||
private void stepTwo() {
|
||||
// Construct a path of alternating starred zeroes and primed zeroes, where
|
||||
// each starred zero is in the same column as the previous primed zero, and
|
||||
// each primed zero is in the same row as the previous starred zero. The
|
||||
// path will always end in a primed zero.
|
||||
while (true) {
|
||||
Pair<Integer, Integer> star = starInCol(path.getLast().getSecond());
|
||||
if (star != null) {
|
||||
path.offerLast(star);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
Pair<Integer, Integer> prime = primeInRow(path.getLast().getFirst());
|
||||
path.offerLast(prime);
|
||||
}
|
||||
|
||||
// Augment path - unmask all starred zeroes and star all primed zeroes. All
|
||||
// nodes in the path will be either starred or primed zeroes. The set of
|
||||
// starred zeroes is independent and now one larger than before.
|
||||
for (Pair<Integer, Integer> p : path) {
|
||||
if (mask[p.getFirst()][p.getSecond()] == STAR) {
|
||||
mask[p.getFirst()][p.getSecond()] = NONE;
|
||||
} else {
|
||||
mask[p.getFirst()][p.getSecond()] = STAR;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear all covers from rows and columns.
|
||||
Arrays.fill(rowsCovered, false);
|
||||
Arrays.fill(colsCovered, false);
|
||||
|
||||
// Remove the prime mask from all primed zeroes.
|
||||
for (int r = 0; r < rows; r++) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (mask[r][c] == PRIME) {
|
||||
mask[r][c] = NONE;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Corresponds to step 3 of the original algorithm.
|
||||
*/
|
||||
private void stepThree() {
|
||||
// Find the minimum uncovered cost.
|
||||
float min = leastInRow[0];
|
||||
for (int r = 1; r < rows; r++) {
|
||||
if (leastInRow[r] < min) {
|
||||
min = leastInRow[r];
|
||||
}
|
||||
}
|
||||
|
||||
// Add the minimum cost to each of the costs in a covered row, or subtract
|
||||
// the minimum cost from each of the costs in an uncovered column. As an
|
||||
// optimization, do not actually modify the cost matrix yet, but track the
|
||||
// adjustments that need to be made to each row and column.
|
||||
for (int r = 0; r < rows; r++) {
|
||||
if (rowsCovered[r]) {
|
||||
rowAdjust[r] += min;
|
||||
}
|
||||
}
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (!colsCovered[c]) {
|
||||
colAdjust[c] -= min;
|
||||
}
|
||||
}
|
||||
|
||||
// Since the cost matrix is not being updated yet, the minimum uncovered
|
||||
// cost per row must be updated.
|
||||
for (int r = 0; r < rows; r++) {
|
||||
if (!colsCovered[leastInRowIndex[r]]) {
|
||||
// The least value in this row was in an uncovered column, meaning that
|
||||
// it would have had the minimum value subtracted from it, and therefore
|
||||
// will still be the minimum value in that row.
|
||||
leastInRow[r] -= min;
|
||||
} else {
|
||||
// The least value in this row was in a covered column and would not
|
||||
// have had the minimum value subtracted from it, so the minimum value
|
||||
// could be some in another column.
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (cost[r][c] + colAdjust[c] + rowAdjust[r] < leastInRow[r]) {
|
||||
leastInRow[r] = cost[r][c] + colAdjust[c] + rowAdjust[r];
|
||||
leastInRowIndex[r] = c;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a zero cost assignment which is not covered. If there are no zero cost
|
||||
* assignments which are uncovered, then null will be returned.
|
||||
* @return pair of row and column indices of an uncovered zero or null
|
||||
*/
|
||||
private Pair<Integer, Integer> findUncoveredZero() {
|
||||
for (int r = 0; r < rows; r++) {
|
||||
if (leastInRow[r] == 0) {
|
||||
return new Pair<Integer, Integer>(r, leastInRowIndex[r]);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* A specified row has become covered, and a specified column has become
|
||||
* uncovered. The least value per row may need to be updated.
|
||||
* @param row the index of the row which was just covered
|
||||
* @param col the index of the column which was just uncovered
|
||||
*/
|
||||
private void updateMin(int row, int col) {
|
||||
// If the row is covered we want to ignore it as far as least values go.
|
||||
leastInRow[row] = Float.POSITIVE_INFINITY;
|
||||
|
||||
for (int r = 0; r < rows; r++) {
|
||||
// Since the column has only just been uncovered, it could not have any
|
||||
// pending adjustments. Only covered rows can have pending adjustments
|
||||
// and covered costs do not count toward row minimums. Therefore, we do
|
||||
// not need to consider rowAdjust[r] or colAdjust[col].
|
||||
if (!rowsCovered[r] && cost[r][col] < leastInRow[r]) {
|
||||
leastInRow[r] = cost[r][col];
|
||||
leastInRowIndex[r] = col;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a starred zero in a specified row. If there are no starred zeroes in
|
||||
* the specified row, then null will be returned.
|
||||
* @param r the index of the row to be searched
|
||||
* @return pair of row and column indices of starred zero or null
|
||||
*/
|
||||
private Pair<Integer, Integer> starInRow(int r) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (mask[r][c] == STAR) {
|
||||
return new Pair<Integer, Integer>(r, c);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a starred zero in the specified column. If there are no starred zeroes
|
||||
* in the specified row, then null will be returned.
|
||||
* @param c the index of the column to be searched
|
||||
* @return pair of row and column indices of starred zero or null
|
||||
*/
|
||||
private Pair<Integer, Integer> starInCol(int c) {
|
||||
for (int r = 0; r < rows; r++) {
|
||||
if (mask[r][c] == STAR) {
|
||||
return new Pair<Integer, Integer>(r, c);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a primed zero in the specified row. If there are no primed zeroes in
|
||||
* the specified row, then null will be returned.
|
||||
* @param r the index of the row to be searched
|
||||
* @return pair of row and column indices of primed zero or null
|
||||
*/
|
||||
private Pair<Integer, Integer> primeInRow(int r) {
|
||||
for (int c = 0; c < cols; c++) {
|
||||
if (mask[r][c] == PRIME) {
|
||||
return new Pair<Integer, Integer>(r, c);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
|
||||
|
@ -557,4 +559,10 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
|||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
|
||||
UpdateFavoredNodesRequest request) throws ServiceException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,16 +25,19 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -53,15 +56,19 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodes.Position;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan.Position;
|
||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestRegionPlacement {
|
||||
|
@ -69,7 +76,9 @@ public class TestRegionPlacement {
|
|||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private final static int SLAVES = 10;
|
||||
private static HBaseAdmin admin;
|
||||
private static RegionPlacementMaintainer rp;
|
||||
private static Position[] positions = Position.values();
|
||||
private int lastRegionOnPrimaryRSCount = 0;
|
||||
private int REGION_NUM = 10;
|
||||
private Map<HRegionInfo, ServerName[]> favoredNodesAssignmentPlan =
|
||||
new HashMap<HRegionInfo, ServerName[]>();
|
||||
|
@ -86,6 +95,7 @@ public class TestRegionPlacement {
|
|||
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
TEST_UTIL.startMiniCluster(SLAVES);
|
||||
admin = new HBaseAdmin(conf);
|
||||
rp = new RegionPlacementMaintainer(conf);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -198,19 +208,255 @@ public class TestRegionPlacement {
|
|||
!favoredNodesNow.contains(favoredNodesAfter.get(TERTIARY)));
|
||||
}
|
||||
|
||||
@Test(timeout = 180000)
|
||||
@Test
|
||||
public void testRegionPlacement() throws Exception {
|
||||
byte[] table = Bytes.toBytes("testRegionAssignment");
|
||||
// Create a table with REGION_NUM regions.
|
||||
createTable("testRegionAssignment", REGION_NUM);
|
||||
createTable(table, REGION_NUM);
|
||||
|
||||
TEST_UTIL.waitTableAvailable(Bytes.toBytes("testRegionAssignment"));
|
||||
TEST_UTIL.waitTableAvailable(table);
|
||||
|
||||
// Verify all the user regions are assigned to the primary region server
|
||||
// based on the plan
|
||||
countRegionOnPrimaryRS(REGION_NUM);
|
||||
verifyRegionOnPrimaryRS(REGION_NUM);
|
||||
|
||||
FavoredNodesPlan currentPlan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
|
||||
// Verify all the region server are update with the latest favored nodes
|
||||
verifyRegionServerUpdated();
|
||||
verifyRegionServerUpdated(currentPlan);
|
||||
// Test Case 2: To verify whether the region placement tools can
|
||||
// correctly update the new assignment plan to META and Region Server.
|
||||
// The new assignment plan is generated by shuffle the existing assignment
|
||||
// plan by switching PRIMARY, SECONDARY and TERTIARY nodes.
|
||||
// Shuffle the plan by switching the secondary region server with
|
||||
// the tertiary.
|
||||
|
||||
// Shuffle the secondary with tertiary favored nodes
|
||||
FavoredNodesPlan shuffledPlan = this.shuffleAssignmentPlan(currentPlan,
|
||||
FavoredNodesPlan.Position.SECONDARY, FavoredNodesPlan.Position.TERTIARY);
|
||||
// Let the region placement update the META and Region Servers
|
||||
rp.updateAssignmentPlan(shuffledPlan);
|
||||
|
||||
// Verify the region assignment. There are supposed to no region reassignment
|
||||
// All the regions are still on the primary region server
|
||||
verifyRegionAssignment(shuffledPlan,0, REGION_NUM);
|
||||
|
||||
// Shuffle the plan by switching the primary with secondary and
|
||||
// verify the region reassignment is consistent with the plan.
|
||||
shuffledPlan = this.shuffleAssignmentPlan(currentPlan,
|
||||
FavoredNodesPlan.Position.PRIMARY, FavoredNodesPlan.Position.SECONDARY);
|
||||
|
||||
// Let the region placement update the META and Region Servers
|
||||
rp.updateAssignmentPlan(shuffledPlan);
|
||||
|
||||
verifyRegionAssignment(shuffledPlan, REGION_NUM, REGION_NUM);
|
||||
// Check when a RS stops, the regions get assigned to their secondary/tertiary
|
||||
killRandomServerAndVerifyAssignment();
|
||||
RegionPlacementMaintainer.printAssignmentPlan(currentPlan);
|
||||
}
|
||||
|
||||
private void killRandomServerAndVerifyAssignment()
|
||||
throws IOException, InterruptedException {
|
||||
ClusterStatus oldStatus = TEST_UTIL.getHBaseCluster().getClusterStatus();
|
||||
ServerName servers[] = oldStatus.getServers().toArray(new ServerName[10]);
|
||||
ServerName serverToKill = null;
|
||||
int killIndex = 0;
|
||||
Random random = new Random(System.currentTimeMillis());
|
||||
ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
|
||||
LOG.debug("Server holding meta " + metaServer);
|
||||
do {
|
||||
// kill a random non-meta server carrying at least one region
|
||||
killIndex = random.nextInt(servers.length);
|
||||
serverToKill = TEST_UTIL.getHBaseCluster().getRegionServer(killIndex).getServerName();
|
||||
} while (ServerName.isSameHostnameAndPort(metaServer, serverToKill) ||
|
||||
TEST_UTIL.getHBaseCluster().getRegionServer(killIndex).getNumberOfOnlineRegions() == 0);
|
||||
LOG.debug("Stopping RS " + serverToKill);
|
||||
Map<HRegionInfo, Pair<ServerName, ServerName>> regionsToVerify =
|
||||
new HashMap<HRegionInfo, Pair<ServerName, ServerName>>();
|
||||
// mark the regions to track
|
||||
for (Map.Entry<HRegionInfo, ServerName[]> entry : favoredNodesAssignmentPlan.entrySet()) {
|
||||
ServerName s = entry.getValue()[0];
|
||||
if (ServerName.isSameHostnameAndPort(s, serverToKill)) {
|
||||
regionsToVerify.put(entry.getKey(), new Pair<ServerName, ServerName>(
|
||||
entry.getValue()[1], entry.getValue()[2]));
|
||||
LOG.debug("Adding " + entry.getKey() + " with sedcondary/tertiary " +
|
||||
entry.getValue()[1] + " " + entry.getValue()[2]);
|
||||
}
|
||||
}
|
||||
int orig = TEST_UTIL.getHBaseCluster().getMaster().assignmentManager.getNumRegionsOpened();
|
||||
TEST_UTIL.getHBaseCluster().stopRegionServer(serverToKill);
|
||||
TEST_UTIL.getHBaseCluster().waitForRegionServerToStop(serverToKill, 60000);
|
||||
int curr = TEST_UTIL.getHBaseCluster().getMaster().assignmentManager.getNumRegionsOpened();
|
||||
while (curr - orig < regionsToVerify.size()) {
|
||||
LOG.debug("Waiting for " + regionsToVerify.size() + " to come online " +
|
||||
" Current #regions " + curr + " Original #regions " + orig);
|
||||
Thread.sleep(200);
|
||||
curr = TEST_UTIL.getHBaseCluster().getMaster().assignmentManager.getNumRegionsOpened();
|
||||
}
|
||||
// now verify
|
||||
for (Map.Entry<HRegionInfo, Pair<ServerName, ServerName>> entry : regionsToVerify.entrySet()) {
|
||||
ServerName newDestination = TEST_UTIL.getHBaseCluster().getMaster()
|
||||
.getAssignmentManager().getRegionStates().getRegionServerOfRegion(entry.getKey());
|
||||
Pair<ServerName, ServerName> secondaryTertiaryServers = entry.getValue();
|
||||
LOG.debug("New destination for region " + entry.getKey().getEncodedName() +
|
||||
" " + newDestination +". Secondary/Tertiary are " + secondaryTertiaryServers.getFirst()
|
||||
+ "/" + secondaryTertiaryServers.getSecond());
|
||||
if (!(ServerName.isSameHostnameAndPort(newDestination, secondaryTertiaryServers.getFirst())||
|
||||
ServerName.isSameHostnameAndPort(newDestination, secondaryTertiaryServers.getSecond()))){
|
||||
fail("Region " + entry.getKey() + " not present on any of the expected servers");
|
||||
}
|
||||
}
|
||||
// start(reinstate) region server since we killed one before
|
||||
TEST_UTIL.getHBaseCluster().startRegionServer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to test the correctness of this class.
|
||||
*/
|
||||
@Test
|
||||
public void testRandomizedMatrix() {
|
||||
int rows = 100;
|
||||
int cols = 100;
|
||||
float[][] matrix = new float[rows][cols];
|
||||
Random random = new Random();
|
||||
for (int i = 0; i < rows; i++) {
|
||||
for (int j = 0; j < cols; j++) {
|
||||
matrix[i][j] = random.nextFloat();
|
||||
}
|
||||
}
|
||||
|
||||
// Test that inverting a transformed matrix gives the original matrix.
|
||||
RegionPlacementMaintainer.RandomizedMatrix rm =
|
||||
new RegionPlacementMaintainer.RandomizedMatrix(rows, cols);
|
||||
float[][] transformed = rm.transform(matrix);
|
||||
float[][] invertedTransformed = rm.invert(transformed);
|
||||
for (int i = 0; i < rows; i++) {
|
||||
for (int j = 0; j < cols; j++) {
|
||||
if (matrix[i][j] != invertedTransformed[i][j]) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test that the indices on a transformed matrix can be inverted to give
|
||||
// the same values on the original matrix.
|
||||
int[] transformedIndices = new int[rows];
|
||||
for (int i = 0; i < rows; i++) {
|
||||
transformedIndices[i] = random.nextInt(cols);
|
||||
}
|
||||
int[] invertedTransformedIndices = rm.invertIndices(transformedIndices);
|
||||
float[] transformedValues = new float[rows];
|
||||
float[] invertedTransformedValues = new float[rows];
|
||||
for (int i = 0; i < rows; i++) {
|
||||
transformedValues[i] = transformed[i][transformedIndices[i]];
|
||||
invertedTransformedValues[i] = matrix[i][invertedTransformedIndices[i]];
|
||||
}
|
||||
Arrays.sort(transformedValues);
|
||||
Arrays.sort(invertedTransformedValues);
|
||||
if (!Arrays.equals(transformedValues, invertedTransformedValues)) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuffle the assignment plan by switching two favored node positions.
|
||||
* @param plan The assignment plan
|
||||
* @param p1 The first switch position
|
||||
* @param p2 The second switch position
|
||||
* @return
|
||||
*/
|
||||
private FavoredNodesPlan shuffleAssignmentPlan(FavoredNodesPlan plan,
|
||||
FavoredNodesPlan.Position p1, FavoredNodesPlan.Position p2) {
|
||||
FavoredNodesPlan shuffledPlan = new FavoredNodesPlan();
|
||||
|
||||
for (Map.Entry<HRegionInfo, List<ServerName>> entry :
|
||||
plan.getAssignmentMap().entrySet()) {
|
||||
HRegionInfo region = entry.getKey();
|
||||
|
||||
// copy the server list from the original plan
|
||||
List<ServerName> shuffledServerList = new ArrayList<ServerName>();
|
||||
shuffledServerList.addAll(entry.getValue());
|
||||
|
||||
// start to shuffle
|
||||
shuffledServerList.set(p1.ordinal(), entry.getValue().get(p2.ordinal()));
|
||||
shuffledServerList.set(p2.ordinal(), entry.getValue().get(p1.ordinal()));
|
||||
|
||||
// update the plan
|
||||
shuffledPlan.updateAssignmentPlan(region, shuffledServerList);
|
||||
}
|
||||
return shuffledPlan;
|
||||
}
|
||||
|
||||
/**
|
||||
* To verify the region assignment status.
|
||||
* It will check the assignment plan consistency between META and
|
||||
* region servers.
|
||||
* Also it will verify weather the number of region movement and
|
||||
* the number regions on the primary region server are expected
|
||||
*
|
||||
* @param plan
|
||||
* @param regionMovementNum
|
||||
* @param numRegionsOnPrimaryRS
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
*/
|
||||
private void verifyRegionAssignment(FavoredNodesPlan plan,
|
||||
int regionMovementNum, int numRegionsOnPrimaryRS)
|
||||
throws InterruptedException, IOException {
|
||||
// Verify the assignment plan in META is consistent with the expected plan.
|
||||
verifyMETAUpdated(plan);
|
||||
|
||||
// Verify the number of region movement is expected
|
||||
verifyRegionMovementNum(regionMovementNum);
|
||||
|
||||
// Verify the number of regions is assigned to the primary region server
|
||||
// based on the plan is expected
|
||||
verifyRegionOnPrimaryRS(numRegionsOnPrimaryRS);
|
||||
|
||||
// Verify all the online region server are updated with the assignment plan
|
||||
verifyRegionServerUpdated(plan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the meta has updated to the latest assignment plan
|
||||
* @param plan
|
||||
* @throws IOException
|
||||
*/
|
||||
private void verifyMETAUpdated(FavoredNodesPlan expectedPlan)
|
||||
throws IOException {
|
||||
FavoredNodesPlan planFromMETA = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
|
||||
assertTrue("The assignment plan is NOT consistent with the expected plan ",
|
||||
planFromMETA.equals(expectedPlan));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the number of region movement is expected
|
||||
*/
|
||||
private void verifyRegionMovementNum(int expected)
|
||||
throws InterruptedException, HBaseIOException {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
HMaster m = cluster.getMaster();
|
||||
int lastRegionOpenedCount = m.assignmentManager.getNumRegionsOpened();
|
||||
// get the assignments start to execute
|
||||
m.balance();
|
||||
|
||||
int retry = 10;
|
||||
long sleep = 3000;
|
||||
int attempt = 0;
|
||||
int currentRegionOpened, regionMovement;
|
||||
do {
|
||||
currentRegionOpened = m.assignmentManager.getNumRegionsOpened();
|
||||
regionMovement= currentRegionOpened - lastRegionOpenedCount;
|
||||
LOG.debug("There are " + regionMovement + "/" + expected +
|
||||
" regions moved after " + attempt + " attempts");
|
||||
Thread.sleep((++attempt) * sleep);
|
||||
} while (regionMovement != expected && attempt <= retry);
|
||||
|
||||
// update the lastRegionOpenedCount
|
||||
lastRegionOpenedCount = currentRegionOpened;
|
||||
|
||||
assertEquals("There are only " + regionMovement + " instead of "
|
||||
+ expected + " region movement for " + attempt + " attempts",
|
||||
regionMovement, expected);
|
||||
}
|
||||
|
||||
private List<ServerName> removeMatchingServers(ServerName serverWithoutStartCode,
|
||||
|
@ -240,9 +486,9 @@ public class TestRegionPlacement {
|
|||
* @param expectedNum.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void countRegionOnPrimaryRS(int expectedNum)
|
||||
private void verifyRegionOnPrimaryRS(int expectedNum)
|
||||
throws IOException {
|
||||
int lastRegionOnPrimaryRSCount = getNumRegionisOnPrimaryRS();
|
||||
lastRegionOnPrimaryRSCount = getNumRegionisOnPrimaryRS();
|
||||
assertEquals("Only " + expectedNum + " of user regions running " +
|
||||
"on the primary region server", expectedNum ,
|
||||
lastRegionOnPrimaryRSCount);
|
||||
|
@ -254,7 +500,7 @@ public class TestRegionPlacement {
|
|||
* @param plan
|
||||
* @throws IOException
|
||||
*/
|
||||
private void verifyRegionServerUpdated() throws IOException {
|
||||
private void verifyRegionServerUpdated(FavoredNodesPlan plan) throws IOException {
|
||||
// Verify all region servers contain the correct favored nodes information
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
for (int i = 0; i < SLAVES; i++) {
|
||||
|
@ -263,7 +509,7 @@ public class TestRegionPlacement {
|
|||
TableName.valueOf("testRegionAssignment"))) {
|
||||
InetSocketAddress[] favoredSocketAddress = rs.getFavoredNodesForRegion(
|
||||
region.getRegionInfo().getEncodedName());
|
||||
ServerName[] favoredServerList = favoredNodesAssignmentPlan.get(region.getRegionInfo());
|
||||
List<ServerName> favoredServerList = plan.getAssignmentMap().get(region.getRegionInfo());
|
||||
|
||||
// All regions are supposed to have favored nodes,
|
||||
// except for META and ROOT
|
||||
|
@ -278,12 +524,12 @@ public class TestRegionPlacement {
|
|||
} else {
|
||||
// For user region, the favored nodes in the region server should be
|
||||
// identical to favored nodes in the assignmentPlan
|
||||
assertTrue(favoredSocketAddress.length == favoredServerList.length);
|
||||
assertTrue(favoredServerList.length > 0);
|
||||
for (int j = 0; j < favoredServerList.length; j++) {
|
||||
assertTrue(favoredSocketAddress.length == favoredServerList.size());
|
||||
assertTrue(favoredServerList.size() > 0);
|
||||
for (int j = 0; j < favoredServerList.size(); j++) {
|
||||
InetSocketAddress addrFromRS = favoredSocketAddress[j];
|
||||
InetSocketAddress addrFromPlan = InetSocketAddress.createUnresolved(
|
||||
favoredServerList[j].getHostname(), favoredServerList[j].getPort());
|
||||
favoredServerList.get(j).getHostname(), favoredServerList.get(j).getPort());
|
||||
|
||||
assertNotNull(addrFromRS);
|
||||
assertNotNull(addrFromPlan);
|
||||
|
@ -379,9 +625,8 @@ public class TestRegionPlacement {
|
|||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void createTable(String table, int regionNum)
|
||||
private static void createTable(byte[] tableName, int regionNum)
|
||||
throws IOException {
|
||||
byte[] tableName = Bytes.toBytes(table);
|
||||
int expectedRegions = regionNum;
|
||||
byte[][] splitKeys = new byte[expectedRegions - 1][];
|
||||
for (int i = 1; i < expectedRegions; i++) {
|
||||
|
|
Loading…
Reference in New Issue