HBASE-23006 RSGroupBasedLoadBalancer should also try to place replicas for the same region to different region servers (#605)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
89f011eae7
commit
6f68147316
|
@ -109,6 +109,12 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-procedure</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-procedure</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-protocol</artifactId>
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
|
@ -160,23 +159,21 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
|
||||
List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
|
||||
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
|
||||
List<ServerName> servers) throws HBaseIOException {
|
||||
Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
|
||||
ListMultimap<String,RegionInfo> regionMap = ArrayListMultimap.create();
|
||||
ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
|
||||
ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
|
||||
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
|
||||
generateGroupMaps(regions, servers, regionMap, serverMap);
|
||||
for(String groupKey : regionMap.keySet()) {
|
||||
for (String groupKey : regionMap.keySet()) {
|
||||
if (regionMap.get(groupKey).size() > 0) {
|
||||
Map<ServerName, List<RegionInfo>> result =
|
||||
this.internalBalancer.roundRobinAssignment(
|
||||
regionMap.get(groupKey),
|
||||
serverMap.get(groupKey));
|
||||
if(result != null) {
|
||||
if(result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
|
||||
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)){
|
||||
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll(
|
||||
result.get(LoadBalancer.BOGUS_SERVER_NAME));
|
||||
Map<ServerName, List<RegionInfo>> result = this.internalBalancer
|
||||
.roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
|
||||
if (result != null) {
|
||||
if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
|
||||
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
|
||||
assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
|
||||
.addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
|
||||
} else {
|
||||
assignments.putAll(result);
|
||||
}
|
||||
|
@ -187,24 +184,19 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<ServerName, List<RegionInfo>> retainAssignment(
|
||||
Map<RegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
|
||||
public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
|
||||
List<ServerName> servers) throws HBaseIOException {
|
||||
try {
|
||||
Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
|
||||
ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
|
||||
Set<RegionInfo> misplacedRegions = getMisplacedRegions(regions);
|
||||
for (RegionInfo region : regions.keySet()) {
|
||||
if (!misplacedRegions.contains(region)) {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.debug("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
groupToRegion.put(groupName, region);
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.debug("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
groupToRegion.put(groupName, region);
|
||||
}
|
||||
// Now the "groupToRegion" map has only the regions which have correct
|
||||
// assignments.
|
||||
for (String key : groupToRegion.keySet()) {
|
||||
Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
|
||||
List<RegionInfo> regionList = groupToRegion.get(key);
|
||||
|
@ -213,43 +205,16 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
for (RegionInfo region : regionList) {
|
||||
currentAssignmentMap.put(region, regions.get(region));
|
||||
}
|
||||
if(candidateList.size() > 0) {
|
||||
assignments.putAll(this.internalBalancer.retainAssignment(
|
||||
currentAssignmentMap, candidateList));
|
||||
} else{
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No available server to assign regions: " + regionList.toString());
|
||||
}
|
||||
for(RegionInfo region : regionList) {
|
||||
if (!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
|
||||
assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>());
|
||||
}
|
||||
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (RegionInfo region : misplacedRegions) {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.debug("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
|
||||
List<ServerName> candidateList = filterOfflineServers(info, servers);
|
||||
ServerName server = this.internalBalancer.randomAssignment(region,
|
||||
candidateList);
|
||||
if (server != null) {
|
||||
if (!assignments.containsKey(server)) {
|
||||
assignments.put(server, new ArrayList<>());
|
||||
}
|
||||
assignments.get(server).add(region);
|
||||
if (candidateList.size() > 0) {
|
||||
assignments
|
||||
.putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
|
||||
} else {
|
||||
//if not server is available assign to bogus so it ends up in RIT
|
||||
if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
|
||||
assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No available servers to assign regions: {}",
|
||||
RegionInfo.getShortNameToLog(regionList));
|
||||
}
|
||||
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
|
||||
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
|
||||
.addAll(regionList);
|
||||
}
|
||||
}
|
||||
return assignments;
|
||||
|
@ -268,11 +233,9 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
return this.internalBalancer.randomAssignment(region, filteredServers);
|
||||
}
|
||||
|
||||
private void generateGroupMaps(
|
||||
List<RegionInfo> regions,
|
||||
List<ServerName> servers,
|
||||
ListMultimap<String, RegionInfo> regionMap,
|
||||
ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
|
||||
private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
|
||||
ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
|
||||
throws HBaseIOException {
|
||||
try {
|
||||
for (RegionInfo region : regions) {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
|
@ -295,74 +258,37 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
}
|
||||
|
||||
private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
|
||||
List<ServerName> onlineServers) {
|
||||
List<ServerName> onlineServers) {
|
||||
if (RSGroupInfo != null) {
|
||||
return filterServers(RSGroupInfo.getServers(), onlineServers);
|
||||
} else {
|
||||
LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
|
||||
return Collections.EMPTY_LIST;
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter servers based on the online servers.
|
||||
*
|
||||
* @param servers
|
||||
* the servers
|
||||
* @param onlineServers
|
||||
* List of servers which are online.
|
||||
* <p/>
|
||||
* servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
|
||||
* its contains()'s time complexity as O(logn), which is good enough.
|
||||
* <p/>
|
||||
* TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
|
||||
* needed.
|
||||
* @param servers the servers
|
||||
* @param onlineServers List of servers which are online.
|
||||
* @return the list
|
||||
*/
|
||||
private List<ServerName> filterServers(Set<Address> servers,
|
||||
List<ServerName> onlineServers) {
|
||||
/**
|
||||
* servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}),
|
||||
* having its contains()'s time complexity as O(logn), which is good enough.
|
||||
* TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain
|
||||
* if needed. */
|
||||
private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
|
||||
ArrayList<ServerName> finalList = new ArrayList<>();
|
||||
for (ServerName onlineServer : onlineServers) {
|
||||
if (servers.contains(onlineServer.getAddress())) {
|
||||
finalList.add(onlineServer);
|
||||
}
|
||||
}
|
||||
|
||||
return finalList;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Set<RegionInfo> getMisplacedRegions(
|
||||
Map<RegionInfo, ServerName> regions) throws IOException {
|
||||
Set<RegionInfo> misplacedRegions = new HashSet<>();
|
||||
for(Map.Entry<RegionInfo, ServerName> region : regions.entrySet()) {
|
||||
RegionInfo regionInfo = region.getKey();
|
||||
ServerName assignedServer = region.getValue();
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.debug("Group not found for table " + regionInfo.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
|
||||
if (assignedServer == null) {
|
||||
LOG.debug("There is no assigned server for {}", region);
|
||||
continue;
|
||||
}
|
||||
RSGroupInfo otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress());
|
||||
if (info == null && otherInfo == null) {
|
||||
LOG.warn("Couldn't obtain rs group information for {} on {}", region, assignedServer);
|
||||
continue;
|
||||
}
|
||||
if ((info == null || !info.containsServer(assignedServer.getAddress()))) {
|
||||
LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() +
|
||||
" on server: " + assignedServer +
|
||||
" found in group: " + otherInfo +
|
||||
" outside of group: " + (info == null ? "UNKNOWN" : info.getName()));
|
||||
misplacedRegions.add(regionInfo);
|
||||
}
|
||||
}
|
||||
return misplacedRegions;
|
||||
}
|
||||
|
||||
private ServerName findServerForRegion(
|
||||
Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) {
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
|
||||
|
|
|
@ -222,20 +222,6 @@ public class TestRSGroupBasedLoadBalancer {
|
|||
assertClusterAsBalanced(loadMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMisplacedRegions() throws Exception {
|
||||
// Test case where region is not considered misplaced if RSGroupInfo cannot be determined
|
||||
Map<RegionInfo, ServerName> inputForTest = new HashMap<>();
|
||||
RegionInfo ri = RegionInfoBuilder.newBuilder(table0)
|
||||
.setStartKey(new byte[16])
|
||||
.setEndKey(new byte[16])
|
||||
.setSplit(false)
|
||||
.setRegionId(regionId++)
|
||||
.build();
|
||||
inputForTest.put(ri, servers.iterator().next());
|
||||
Set<RegionInfo> misplacedRegions = loadBalancer.getMisplacedRegions(inputForTest);
|
||||
assertFalse(misplacedRegions.contains(ri));
|
||||
}
|
||||
/**
|
||||
* Test the cluster startup bulk assignment which attempts to retain assignment info.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MediumTests.class })
|
||||
public class TestSCPWithReplicasWithRSGroup extends TestServerCrashProcedureWithReplicas {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSCPWithReplicasWithRSGroup.class);
|
||||
|
||||
|
||||
@Override
|
||||
protected void startMiniCluster() throws Exception {
|
||||
Configuration conf = this.util.getConfiguration();
|
||||
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class,
|
||||
LoadBalancer.class);
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName());
|
||||
this.util.startMiniCluster(4);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue