HBASE-21102 ServerCrashProcedure should select target server where no

other replicas exist for the current region (Ram)
This commit is contained in:
Vasudevan 2018-09-17 22:36:50 +05:30
parent 39e0b8515f
commit 27b772ddc6
5 changed files with 224 additions and 44 deletions

View File

@ -41,12 +41,14 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -820,25 +822,39 @@ public class RegionStates {
public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
final Collection<RegionInfo> regions) {
final Map<ServerName, List<RegionInfo>> result = new HashMap<ServerName, List<RegionInfo>>();
for (RegionInfo hri: regions) {
final RegionStateNode node = getRegionStateNode(hri);
if (node == null) continue;
// TODO: State.OPEN
final ServerName serverName = node.getRegionLocation();
if (serverName == null) continue;
List<RegionInfo> serverRegions = result.get(serverName);
if (serverRegions == null) {
serverRegions = new ArrayList<RegionInfo>();
result.put(serverName, serverRegions);
if (regions != null) {
for (RegionInfo hri : regions) {
final RegionStateNode node = getRegionStateNode(hri);
if (node == null) {
continue;
}
createSnapshot(node, result);
}
} else {
for (RegionStateNode node : regionsMap.values()) {
if (node == null) {
continue;
}
createSnapshot(node, result);
}
serverRegions.add(node.getRegionInfo());
}
return result;
}
private void createSnapshot(RegionStateNode node, Map<ServerName, List<RegionInfo>> result) {
final ServerName serverName = node.getRegionLocation();
if (serverName == null) {
return;
}
List<RegionInfo> serverRegions = result.get(serverName);
if (serverRegions == null) {
serverRegions = new ArrayList<RegionInfo>();
result.put(serverName, serverRegions);
}
serverRegions.add(node.getRegionInfo());
}
public Map<RegionInfo, ServerName> getRegionAssignments() {
final HashMap<RegionInfo, ServerName> assignments = new HashMap<RegionInfo, ServerName>();
for (RegionStateNode node: regionsMap.values()) {
@ -1127,6 +1143,26 @@ public class RegionStates {
return serverNode;
}
public boolean isReplicaAvailableForRegion(final RegionInfo info) {
// if the region info itself is a replica return true.
if (!RegionReplicaUtil.isDefaultReplica(info)) {
return true;
}
// iterate the regionsMap for the given region name. If there are replicas it should
// list them in order.
for (RegionStateNode node : regionsMap.tailMap(info.getRegionName()).values()) {
if (!node.getTable().equals(info.getTable())
|| !ServerRegionReplicaUtil.isReplicasForSameRegion(info, node.getRegionInfo())) {
break;
} else if (!RegionReplicaUtil.isDefaultReplica(node.getRegionInfo())) {
// we have replicas
return true;
}
}
// we don have replicas
return false;
}
public ServerStateNode removeRegionFromServer(final ServerName serverName,
final RegionStateNode regionNode) {
ServerStateNode serverNode = getOrCreateServer(serverName);

View File

@ -34,6 +34,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
@ -51,15 +52,14 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The base class for load balancers. It provides the the functions used to by
@ -741,7 +741,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int region = regionsToIndex.get(regionInfo);
int primary = regionIndexToPrimaryIndex[region];
// there is a subset relation for server < host < rack
// check server first
@ -1262,7 +1261,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return assignments;
}
Cluster cluster = createCluster(servers, regions);
Cluster cluster = createCluster(servers, regions, false);
List<RegionInfo> unassignedRegions = new ArrayList<>();
roundRobinAssignment(cluster, regions, unassignedRegions,
@ -1318,12 +1317,19 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return assignments;
}
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions) {
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions,
boolean hasRegionReplica) {
// Get the snapshot of the current assignments for the regions in question, and then create
// a cluster out of it. Note that we might have replicas already assigned to some servers
// earlier. So we want to get the snapshot to see those assignments, but this will only contain
// replicas of the regions that are passed (for performance).
Map<ServerName, List<RegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
Map<ServerName, List<RegionInfo>> clusterState = null;
if (!hasRegionReplica) {
clusterState = getRegionAssignmentsByServer(regions);
} else {
// for the case where we have region replica it is better we get the entire cluster's snapshot
clusterState = getRegionAssignmentsByServer(null);
}
for (ServerName server : servers) {
if (!clusterState.containsKey(server)) {
@ -1372,7 +1378,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
final List<ServerName> finalServers = idleServers.isEmpty() ?
servers : idleServers;
List<RegionInfo> regions = Lists.newArrayList(regionInfo);
Cluster cluster = createCluster(finalServers, regions);
Cluster cluster = createCluster(finalServers, regions, false);
return randomAssignment(cluster, regionInfo, finalServers);
}
@ -1444,10 +1450,18 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int numRandomAssignments = 0;
int numRetainedAssigments = 0;
boolean hasRegionReplica = false;
for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
RegionInfo region = entry.getKey();
ServerName oldServerName = entry.getValue();
// In the current set of regions even if one has region replica let us go with
// getting the entire snapshot
if (this.services != null && this.services.getAssignmentManager() != null) { // for tests
if (!hasRegionReplica && this.services.getAssignmentManager().getRegionStates()
.isReplicaAvailableForRegion(region)) {
hasRegionReplica = true;
}
}
List<ServerName> localServers = new ArrayList<>();
if (oldServerName != null) {
localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
@ -1487,7 +1501,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// If servers from prior assignment aren't present, then lets do randomAssignment on regions.
if (randomAssignRegions.size() > 0) {
Cluster cluster = createCluster(servers, regions.keySet());
Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica);
for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
ServerName sn = entry.getKey();
for (RegionInfo region : entry.getValue()) {
@ -1497,7 +1511,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (RegionInfo region : randomAssignRegions) {
ServerName target = randomAssignment(cluster, region, servers);
assignments.get(target).add(region);
cluster.doAssignRegion(region, target);
numRandomAssignments++;
}
}
@ -1548,12 +1561,29 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
ServerName sn = null;
final int maxIterations = numServers * 4;
int iterations = 0;
List<ServerName> usedSNs = new ArrayList<>(servers.size());
do {
int i = RANDOM.nextInt(numServers);
sn = servers.get(i);
if (!usedSNs.contains(sn)) {
usedSNs.add(sn);
}
} while (cluster.wouldLowerAvailability(regionInfo, sn)
&& iterations++ < maxIterations);
if (iterations >= maxIterations) {
// We have reached the max. Means the servers that we collected is still lowering the
// availability
for (ServerName unusedServer : servers) {
if (!usedSNs.contains(unusedServer)) {
// check if any other unused server is there for us to use.
// If so use it. Else we have not other go but to go with one of them
if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
sn = unusedServer;
break;
}
}
}
}
cluster.doAssignRegion(regionInfo, sn);
return sn;
}

View File

@ -1345,7 +1345,22 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
*/
public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
throws IOException {
return createTable(tableName, families, splitKeys, new Configuration(getConfiguration()));
return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
}
/**
* Create a table.
* @param tableName the table name
* @param families the families
* @param splitKeys the splitkeys
* @param replicaCount the region replica count
* @return A Table instance for the created table.
* @throws IOException throws IOException
*/
public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
int replicaCount) throws IOException {
return createTable(tableName, families, splitKeys, replicaCount,
new Configuration(getConfiguration()));
}
public Table createTable(TableName tableName, byte[][] families,
@ -1436,16 +1451,19 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
/**
* Create a table.
* @param tableName
* @param families
* @param splitKeys
* @param tableName the table name
* @param families the families
* @param splitKeys the split keys
* @param replicaCount the replica count
* @param c Configuration to use
* @return A Table instance for the created table.
* @throws IOException
*/
public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
final Configuration c) throws IOException {
return createTable(new HTableDescriptor(tableName), families, splitKeys, c);
int replicaCount, final Configuration c) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.setRegionReplication(replicaCount);
return createTable(htd, families, splitKeys, c);
}
/**

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -52,7 +54,7 @@ public class TestServerCrashProcedure {
private static final Logger LOG = LoggerFactory.getLogger(TestServerCrashProcedure.class);
private HBaseTestingUtility util;
protected HBaseTestingUtility util;
private ProcedureMetrics serverCrashProcMetrics;
private long serverCrashSubmittedCount = 0;
@ -68,13 +70,17 @@ public class TestServerCrashProcedure {
public void setup() throws Exception {
this.util = new HBaseTestingUtility();
setupConf(this.util.getConfiguration());
this.util.startMiniCluster(3);
startMiniCluster();
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false);
serverCrashProcMetrics = this.util.getHBaseCluster().getMaster().getMasterMetrics()
.getServerCrashProcMetrics();
}
protected void startMiniCluster() throws Exception {
this.util.startMiniCluster(3);
}
@After
public void tearDown() throws Exception {
MiniHBaseCluster cluster = this.util.getHBaseCluster();
@ -113,11 +119,9 @@ public class TestServerCrashProcedure {
*/
private void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doubleExecution)
throws Exception {
final TableName tableName = TableName.valueOf(
"testRecoveryAndDoubleExecution-carryingMeta-" + carryingMeta);
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
try {
final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution-carryingMeta-"
+ carryingMeta + "-doubleExecution-" + doubleExecution);
try (Table t = createTable(tableName)) {
// Load the table with a bit of data so some logs to split and some edits in each region.
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
final int count = util.countRows(t);
@ -151,17 +155,25 @@ public class TestServerCrashProcedure {
long procId = getSCPProcId(procExec);
ProcedureTestingUtility.waitProcedure(procExec, procId);
}
// Assert all data came back.
assertReplicaDistributed(t);
assertEquals(count, util.countRows(t));
assertEquals(checksum, util.checksumRows(t));
} catch(Throwable throwable) {
} catch (Throwable throwable) {
LOG.error("Test failed!", throwable);
throw throwable;
} finally {
t.close();
}
}
protected void assertReplicaDistributed(final Table t) {
return;
}
protected Table createTable(final TableName tableName) throws IOException {
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
return t;
}
private void collectMasterMetrics() {
serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount();
serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount();

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ MasterTests.class, LargeTests.class })
public class TestServerCrashProcedureWithReplicas extends TestServerCrashProcedure {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestServerCrashProcedureWithReplicas.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestServerCrashProcedureWithReplicas.class);
@Override
protected void startMiniCluster() throws Exception {
// Start a cluster with 4 nodes because we have 3 replicas.
// So on a crash of a server still we can ensure that the
// replicas are distributed.
this.util.startMiniCluster(4);
}
@Override
protected Table createTable(final TableName tableName) throws IOException {
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, 3);
return t;
}
protected void assertReplicaDistributed(final Table t) {
// Assert all data came back.
List<RegionInfo> regionInfos = new ArrayList<>();
for (RegionServerThread rs : this.util.getMiniHBaseCluster().getRegionServerThreads()) {
regionInfos.clear();
for (Region r : rs.getRegionServer().getRegions(t.getName())) {
LOG.info("The region is " + r.getRegionInfo() + " the location is "
+ rs.getRegionServer().getServerName());
if (contains(regionInfos, r.getRegionInfo())) {
LOG.error("Am exiting");
fail("Crashed replica regions should not be assigned to same region server");
} else {
regionInfos.add(r.getRegionInfo());
}
}
}
}
private boolean contains(List<RegionInfo> regionInfos, RegionInfo regionInfo) {
for (RegionInfo info : regionInfos) {
if (RegionReplicaUtil.isReplicasForSameRegion(info, regionInfo)) {
return true;
}
}
return false;
}
}