HBASE-23006 RSGroupBasedLoadBalancer should also try to place replicas for the same region to different region servers (#605)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2019-09-11 11:10:46 +08:00 committed by GitHub
parent 3cfcee9eda
commit b20d9b9d0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 102 additions and 117 deletions

View File

@ -98,6 +98,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-procedure</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-procedure</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>

View File

@ -170,23 +170,21 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
}
@Override
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
List<ServerName> servers) throws HBaseIOException {
Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
generateGroupMaps(regions, servers, regionMap, serverMap);
for (String groupKey : regionMap.keySet()) {
if (regionMap.get(groupKey).size() > 0) {
Map<ServerName, List<RegionInfo>> result =
this.internalBalancer.roundRobinAssignment(
regionMap.get(groupKey),
serverMap.get(groupKey));
Map<ServerName, List<RegionInfo>> result = this.internalBalancer
.roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
if (result != null) {
if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll(
result.get(LoadBalancer.BOGUS_SERVER_NAME));
assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
.addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
} else {
assignments.putAll(result);
}
@ -197,14 +195,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
}
@Override
public Map<ServerName, List<RegionInfo>> retainAssignment(
Map<RegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
List<ServerName> servers) throws HBaseIOException {
try {
Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
Set<RegionInfo> misplacedRegions = getMisplacedRegions(regions);
for (RegionInfo region : regions.keySet()) {
if (!misplacedRegions.contains(region)) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + region.getTable() + ", using default");
@ -212,9 +208,6 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
}
groupToRegion.put(groupName, region);
}
}
// Now the "groupToRegion" map has only the regions which have correct
// assignments.
for (String key : groupToRegion.keySet()) {
Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
List<RegionInfo> regionList = groupToRegion.get(key);
@ -224,8 +217,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
currentAssignmentMap.put(region, regions.get(region));
}
if (candidateList.size() > 0) {
assignments.putAll(this.internalBalancer.retainAssignment(
currentAssignmentMap, candidateList));
assignments
.putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No available servers to assign regions: {}",
@ -235,24 +228,6 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
.addAll(regionList);
}
}
for (RegionInfo region : misplacedRegions) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + region.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
List<ServerName> candidateList = filterOfflineServers(info, servers);
ServerName server = this.internalBalancer.randomAssignment(region,
candidateList);
if (server != null) {
assignments.computeIfAbsent(server, s -> new ArrayList<>()).add(region);
} else {
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
.add(region);
}
}
return assignments;
} catch (IOException e) {
throw new HBaseIOException("Failed to do online retain assignment", e);
@ -269,11 +244,9 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
return this.internalBalancer.randomAssignment(region, filteredServers);
}
private void generateGroupMaps(
List<RegionInfo> regions,
List<ServerName> servers,
ListMultimap<String, RegionInfo> regionMap,
ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
throws HBaseIOException {
try {
for (RegionInfo region : regions) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
@ -307,63 +280,26 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
/**
* Filter servers based on the online servers.
*
* @param servers
* the servers
* @param onlineServers
* List of servers which are online.
* <p/>
* servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
* its contains()'s time complexity as O(logn), which is good enough.
* <p/>
* TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
* needed.
* @param servers the servers
* @param onlineServers List of servers which are online.
* @return the list
*/
private List<ServerName> filterServers(Set<Address> servers,
List<ServerName> onlineServers) {
/**
* servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}),
* having its contains()'s time complexity as O(logn), which is good enough.
* TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain
* if needed. */
private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
ArrayList<ServerName> finalList = new ArrayList<>();
for (ServerName onlineServer : onlineServers) {
if (servers.contains(onlineServer.getAddress())) {
finalList.add(onlineServer);
}
}
return finalList;
}
@VisibleForTesting
public Set<RegionInfo> getMisplacedRegions(
Map<RegionInfo, ServerName> regions) throws IOException {
Set<RegionInfo> misplacedRegions = new HashSet<>();
for(Map.Entry<RegionInfo, ServerName> region : regions.entrySet()) {
RegionInfo regionInfo = region.getKey();
ServerName assignedServer = region.getValue();
String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + regionInfo.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
if (assignedServer == null) {
LOG.debug("There is no assigned server for {}", region);
continue;
}
RSGroupInfo otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress());
if (info == null && otherInfo == null) {
LOG.warn("Couldn't obtain rs group information for {} on {}", region, assignedServer);
continue;
}
if ((info == null || !info.containsServer(assignedServer.getAddress()))) {
LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() +
" on server: " + assignedServer +
" found in group: " + otherInfo +
" outside of group: " + (info == null ? "UNKNOWN" : info.getName()));
misplacedRegions.add(regionInfo);
}
}
return misplacedRegions;
}
private Pair<Map<ServerName, List<RegionInfo>>, List<RegionPlan>> correctAssignments(
Map<ServerName, List<RegionInfo>> existingAssignments) throws HBaseIOException{
// To return

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
@ -131,21 +129,6 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
assertClusterAsBalanced(loadMap);
}
@Test
public void testGetMisplacedRegions() throws Exception {
// Test case where region is not considered misplaced if RSGroupInfo cannot be determined
Map<RegionInfo, ServerName> inputForTest = new HashMap<>();
RegionInfo ri = RegionInfoBuilder.newBuilder(table0)
.setStartKey(new byte[16])
.setEndKey(new byte[16])
.setSplit(false)
.setRegionId(regionId++)
.build();
inputForTest.put(ri, servers.iterator().next());
Set<RegionInfo> misplacedRegions = loadBalancer.getMisplacedRegions(inputForTest);
assertFalse(misplacedRegions.contains(ri));
}
/**
* Test the cluster startup bulk assignment which attempts to retain assignment info.
*/

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class })
public class TestSCPWithReplicasWithRSGroup extends TestSCPBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSCPWithReplicasWithRSGroup.class);
@Override
protected void setupConf(Configuration conf) {
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class,
LoadBalancer.class);
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName());
}
@Override
protected void startMiniCluster() throws Exception {
this.util.startMiniCluster(4);
}
@Override
protected int getRegionReplication() {
return 3;
}
@Test
public void testCrashTargetRs() throws Exception {
testRecoveryAndDoubleExecution(false, false);
}
}