HBASE-24760 Add a config hbase.rsgroup.fallback.enable for RSGroup fallback feature (#2149)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
047e0618d2
commit
7909e29de5
|
@ -19,10 +19,8 @@ package org.apache.hadoop.hbase.rsgroup;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -51,7 +49,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
|
@ -81,15 +78,15 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
private LoadBalancer internalBalancer;
|
||||
|
||||
/**
|
||||
* Define the config key of fallback groups
|
||||
* Enabled only if this property is set
|
||||
* Set this key to {@code true} to allow region fallback.
|
||||
* Fallback to the default rsgroup first, then fallback to any group if no online servers in
|
||||
* default rsgroup.
|
||||
* Please keep balancer switch on at the same time, which is relied on to correct misplaced
|
||||
* regions
|
||||
*/
|
||||
public static final String FALLBACK_GROUPS_KEY = "hbase.rsgroup.fallback.groups";
|
||||
public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
|
||||
|
||||
private boolean fallbackEnabled = false;
|
||||
private Set<String> fallbackGroups;
|
||||
|
||||
/**
|
||||
* Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
|
||||
|
@ -180,22 +177,14 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
|
||||
List<RegionInfo> regions, List<ServerName> servers) throws IOException {
|
||||
Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
|
||||
ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
|
||||
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
|
||||
generateGroupMaps(regions, servers, regionMap, serverMap);
|
||||
for (String groupKey : regionMap.keySet()) {
|
||||
if (regionMap.get(groupKey).size() > 0) {
|
||||
Map<ServerName, List<RegionInfo>> result = this.internalBalancer
|
||||
.roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
|
||||
if (result != null) {
|
||||
if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
|
||||
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
|
||||
assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
|
||||
.addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
|
||||
} else {
|
||||
assignments.putAll(result);
|
||||
}
|
||||
}
|
||||
List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
|
||||
generateGroupAssignments(regions, servers);
|
||||
for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
|
||||
Map<ServerName, List<RegionInfo>> result = this.internalBalancer
|
||||
.roundRobinAssignment(pair.getFirst(), pair.getSecond());
|
||||
if (result != null) {
|
||||
result.forEach((server, regionInfos) ->
|
||||
assignments.computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos));
|
||||
}
|
||||
}
|
||||
return assignments;
|
||||
|
@ -206,36 +195,16 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
List<ServerName> servers) throws HBaseIOException {
|
||||
try {
|
||||
Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
|
||||
ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
|
||||
RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
|
||||
for (RegionInfo region : regions.keySet()) {
|
||||
String groupName =
|
||||
RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
|
||||
.orElse(defaultInfo).getName();
|
||||
groupToRegion.put(groupName, region);
|
||||
}
|
||||
for (String group : groupToRegion.keySet()) {
|
||||
Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
|
||||
List<RegionInfo> regionList = groupToRegion.get(group);
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(group);
|
||||
List<ServerName> candidateList = filterOfflineServers(info, servers);
|
||||
if (fallbackEnabled && candidateList.isEmpty()) {
|
||||
candidateList = getFallBackCandidates(servers);
|
||||
}
|
||||
for (RegionInfo region : regionList) {
|
||||
currentAssignmentMap.put(region, regions.get(region));
|
||||
}
|
||||
if (candidateList.size() > 0) {
|
||||
assignments
|
||||
.putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No available servers for group {} to assign regions: {}", group,
|
||||
RegionInfo.getShortNameToLog(regionList));
|
||||
}
|
||||
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
|
||||
.addAll(regionList);
|
||||
}
|
||||
List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
|
||||
generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
|
||||
for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
|
||||
List<RegionInfo> regionList = pair.getFirst();
|
||||
Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
|
||||
regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r)));
|
||||
Map<ServerName, List<RegionInfo>> pairResult =
|
||||
this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
|
||||
pairResult.forEach((server, rs) ->
|
||||
assignments.computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs));
|
||||
}
|
||||
return assignments;
|
||||
} catch (IOException e) {
|
||||
|
@ -246,17 +215,17 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
@Override
|
||||
public ServerName randomAssignment(RegionInfo region,
|
||||
List<ServerName> servers) throws IOException {
|
||||
ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
|
||||
ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
|
||||
generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
|
||||
List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
|
||||
List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
|
||||
generateGroupAssignments(Lists.newArrayList(region), servers);
|
||||
List<ServerName> filteredServers = pairs.iterator().next().getSecond();
|
||||
return this.internalBalancer.randomAssignment(region, filteredServers);
|
||||
}
|
||||
|
||||
private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
|
||||
ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
|
||||
throws HBaseIOException {
|
||||
private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments(
|
||||
List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
|
||||
try {
|
||||
ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
|
||||
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
|
||||
RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
|
||||
for (RegionInfo region : regions) {
|
||||
String groupName =
|
||||
|
@ -267,15 +236,29 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
for (String groupKey : regionMap.keySet()) {
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
|
||||
serverMap.putAll(groupKey, filterOfflineServers(info, servers));
|
||||
if (fallbackEnabled && serverMap.get(groupKey).isEmpty()) {
|
||||
serverMap.putAll(groupKey, getFallBackCandidates(servers));
|
||||
}
|
||||
}
|
||||
|
||||
List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList();
|
||||
List<RegionInfo> fallbackRegions = Lists.newArrayList();
|
||||
for (String groupKey : regionMap.keySet()) {
|
||||
if (serverMap.get(groupKey).isEmpty()) {
|
||||
serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
|
||||
fallbackRegions.addAll(regionMap.get(groupKey));
|
||||
} else {
|
||||
result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
|
||||
}
|
||||
}
|
||||
if (!fallbackRegions.isEmpty()) {
|
||||
List<ServerName> candidates = null;
|
||||
if (fallbackEnabled) {
|
||||
candidates = getFallBackCandidates(servers);
|
||||
}
|
||||
candidates = (candidates == null || candidates.isEmpty()) ?
|
||||
Lists.newArrayList(BOGUS_SERVER_NAME) : candidates;
|
||||
result.add(Pair.newPair(fallbackRegions, candidates));
|
||||
}
|
||||
return result;
|
||||
} catch(IOException e) {
|
||||
throw new HBaseIOException("Failed to generate group maps", e);
|
||||
throw new HBaseIOException("Failed to generate group assignments", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -390,11 +373,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
internalBalancer.initialize();
|
||||
// init fallback groups
|
||||
Collection<String> groups = config.getTrimmedStringCollection(FALLBACK_GROUPS_KEY);
|
||||
if (groups != null && !groups.isEmpty()) {
|
||||
this.fallbackEnabled = true;
|
||||
this.fallbackGroups = new HashSet<>(groups);
|
||||
}
|
||||
this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
|
||||
}
|
||||
|
||||
public boolean isOnline() {
|
||||
|
@ -485,15 +464,13 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
|
||||
List<ServerName> serverNames = new ArrayList<>();
|
||||
for (String fallbackGroup : fallbackGroups) {
|
||||
try {
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(fallbackGroup);
|
||||
serverNames.addAll(filterOfflineServers(info, servers));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Get group info for {} failed", fallbackGroup, e);
|
||||
}
|
||||
List<ServerName> serverNames = null;
|
||||
try {
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
|
||||
serverNames = filterOfflineServers(info, servers);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to get default rsgroup info to fallback", e);
|
||||
}
|
||||
return serverNames;
|
||||
return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -206,6 +206,12 @@ public abstract class TestRSGroupsBase {
|
|||
}
|
||||
}
|
||||
ADMIN.setRSGroup(tables, RSGroupInfo.DEFAULT_GROUP);
|
||||
for (NamespaceDescriptor nd : ADMIN.listNamespaceDescriptors()) {
|
||||
if (groupName.equals(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))) {
|
||||
nd.removeConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
|
||||
ADMIN.modifyNamespace(nd);
|
||||
}
|
||||
}
|
||||
RSGroupInfo groupInfo = ADMIN.getRSGroup(groupName);
|
||||
ADMIN.moveServersToRSGroup(groupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP);
|
||||
ADMIN.removeRSGroup(groupName);
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.util.Collections;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.net.Address;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RSGroupTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -56,8 +59,8 @@ public class TestRSGroupsFallback extends TestRSGroupsBase {
|
|||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
Configuration configuration = TEST_UTIL.getConfiguration();
|
||||
configuration.set(RSGroupBasedLoadBalancer.FALLBACK_GROUPS_KEY, FALLBACK_GROUP);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, true);
|
||||
setUpTestBeforeClass();
|
||||
MASTER.balanceSwitch(true);
|
||||
}
|
||||
|
@ -78,51 +81,57 @@ public class TestRSGroupsFallback extends TestRSGroupsBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGroupFallback() throws Exception {
|
||||
public void testFallback() throws Exception {
|
||||
// add fallback group
|
||||
addGroup(FALLBACK_GROUP, 1);
|
||||
// add test group
|
||||
String groupName = getGroupName(name.getMethodName());
|
||||
addGroup(groupName, 1);
|
||||
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).build())
|
||||
.setRegionServerGroup(groupName)
|
||||
.build();
|
||||
ADMIN.createTable(desc);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
// server of test group crash
|
||||
for (Address server : ADMIN.getRSGroup(groupName).getServers()) {
|
||||
AssignmentTestingUtil.crashRs(TEST_UTIL, getServerName(server), true);
|
||||
}
|
||||
Threads.sleep(1000);
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition(10000);
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).build())
|
||||
.setRegionServerGroup(groupName)
|
||||
.build();
|
||||
ADMIN.createTable(desc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
// server of test group crash, regions move to default group
|
||||
crashRsInGroup(groupName);
|
||||
assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
|
||||
|
||||
// regions move to fallback group
|
||||
assertRegionsInGroup(FALLBACK_GROUP);
|
||||
// server of default group crash, regions move to any other group
|
||||
crashRsInGroup(RSGroupInfo.DEFAULT_GROUP);
|
||||
assertRegionsInGroup(tableName, FALLBACK_GROUP);
|
||||
|
||||
// move a new server from default group
|
||||
Address address = ADMIN.getRSGroup(RSGroupInfo.DEFAULT_GROUP).getServers().first();
|
||||
ADMIN.moveServersToRSGroup(Collections.singleton(address), groupName);
|
||||
|
||||
// correct misplaced regions
|
||||
// add a new server to default group, regions move to default group
|
||||
TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
|
||||
MASTER.balance();
|
||||
assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
|
||||
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition(10000);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
|
||||
// regions move back
|
||||
assertRegionsInGroup(groupName);
|
||||
// add a new server to test group, regions move back
|
||||
JVMClusterUtil.RegionServerThread t =
|
||||
TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
|
||||
ADMIN.moveServersToRSGroup(
|
||||
Collections.singleton(t.getRegionServer().getServerName().getAddress()), groupName);
|
||||
MASTER.balance();
|
||||
assertRegionsInGroup(tableName, groupName);
|
||||
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
|
||||
private void assertRegionsInGroup(String group) throws IOException {
|
||||
RSGroupInfo fallbackGroup = ADMIN.getRSGroup(group);
|
||||
MASTER.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName).forEach(region -> {
|
||||
private void assertRegionsInGroup(TableName table, String group) throws IOException {
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(table);
|
||||
RSGroupInfo rsGroup = ADMIN.getRSGroup(group);
|
||||
MASTER.getAssignmentManager().getRegionStates().getRegionsOfTable(table).forEach(region -> {
|
||||
Address regionOnServer = MASTER.getAssignmentManager().getRegionStates()
|
||||
.getRegionAssignments().get(region).getAddress();
|
||||
assertTrue(fallbackGroup.getServers().contains(regionOnServer));
|
||||
assertTrue(rsGroup.getServers().contains(regionOnServer));
|
||||
});
|
||||
}
|
||||
|
||||
private void crashRsInGroup(String groupName) throws Exception {
|
||||
for (Address server : ADMIN.getRSGroup(groupName).getServers()) {
|
||||
AssignmentTestingUtil.crashRs(TEST_UTIL, getServerName(server), true);
|
||||
}
|
||||
Threads.sleep(1000);
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue