HBASE-21102 ServerCrashProcedure should select target server where no
other replicas exist for the current region (Ram)
This commit is contained in:
parent
b83613fdce
commit
dc3ada2614
|
@ -466,25 +466,39 @@ public class RegionStates {
|
|||
public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
|
||||
final Collection<RegionInfo> regions) {
|
||||
final Map<ServerName, List<RegionInfo>> result = new HashMap<ServerName, List<RegionInfo>>();
|
||||
for (RegionInfo hri: regions) {
|
||||
final RegionStateNode node = getRegionStateNode(hri);
|
||||
if (node == null) continue;
|
||||
|
||||
// TODO: State.OPEN
|
||||
final ServerName serverName = node.getRegionLocation();
|
||||
if (serverName == null) continue;
|
||||
|
||||
List<RegionInfo> serverRegions = result.get(serverName);
|
||||
if (serverRegions == null) {
|
||||
serverRegions = new ArrayList<RegionInfo>();
|
||||
result.put(serverName, serverRegions);
|
||||
if (regions != null) {
|
||||
for (RegionInfo hri : regions) {
|
||||
final RegionStateNode node = getRegionStateNode(hri);
|
||||
if (node == null) {
|
||||
continue;
|
||||
}
|
||||
createSnapshot(node, result);
|
||||
}
|
||||
} else {
|
||||
for (RegionStateNode node : regionsMap.values()) {
|
||||
if (node == null) {
|
||||
continue;
|
||||
}
|
||||
createSnapshot(node, result);
|
||||
}
|
||||
|
||||
serverRegions.add(node.getRegionInfo());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void createSnapshot(RegionStateNode node, Map<ServerName, List<RegionInfo>> result) {
|
||||
final ServerName serverName = node.getRegionLocation();
|
||||
if (serverName == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<RegionInfo> serverRegions = result.get(serverName);
|
||||
if (serverRegions == null) {
|
||||
serverRegions = new ArrayList<RegionInfo>();
|
||||
result.put(serverName, serverRegions);
|
||||
}
|
||||
serverRegions.add(node.getRegionInfo());
|
||||
}
|
||||
|
||||
public Map<RegionInfo, ServerName> getRegionAssignments() {
|
||||
final HashMap<RegionInfo, ServerName> assignments = new HashMap<RegionInfo, ServerName>();
|
||||
for (RegionStateNode node: regionsMap.values()) {
|
||||
|
|
|
@ -741,7 +741,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
int region = regionsToIndex.get(regionInfo);
|
||||
|
||||
int primary = regionIndexToPrimaryIndex[region];
|
||||
|
||||
// there is a subset relation for server < host < rack
|
||||
// check server first
|
||||
|
||||
|
@ -1262,7 +1261,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
return assignments;
|
||||
}
|
||||
|
||||
Cluster cluster = createCluster(servers, regions);
|
||||
Cluster cluster = createCluster(servers, regions, false);
|
||||
List<RegionInfo> unassignedRegions = new ArrayList<>();
|
||||
|
||||
roundRobinAssignment(cluster, regions, unassignedRegions,
|
||||
|
@ -1318,12 +1317,19 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
return assignments;
|
||||
}
|
||||
|
||||
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions) {
|
||||
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions,
|
||||
boolean hasRegionReplica) {
|
||||
// Get the snapshot of the current assignments for the regions in question, and then create
|
||||
// a cluster out of it. Note that we might have replicas already assigned to some servers
|
||||
// earlier. So we want to get the snapshot to see those assignments, but this will only contain
|
||||
// replicas of the regions that are passed (for performance).
|
||||
Map<ServerName, List<RegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
|
||||
Map<ServerName, List<RegionInfo>> clusterState = null;
|
||||
if (!hasRegionReplica) {
|
||||
clusterState = getRegionAssignmentsByServer(regions);
|
||||
} else {
|
||||
// for the case where we have region replica it is better we get the entire cluster's snapshot
|
||||
clusterState = getRegionAssignmentsByServer(null);
|
||||
}
|
||||
|
||||
for (ServerName server : servers) {
|
||||
if (!clusterState.containsKey(server)) {
|
||||
|
@ -1372,7 +1378,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
final List<ServerName> finalServers = idleServers.isEmpty() ?
|
||||
servers : idleServers;
|
||||
List<RegionInfo> regions = Lists.newArrayList(regionInfo);
|
||||
Cluster cluster = createCluster(finalServers, regions);
|
||||
Cluster cluster = createCluster(finalServers, regions, false);
|
||||
return randomAssignment(cluster, regionInfo, finalServers);
|
||||
}
|
||||
|
||||
|
@ -1444,10 +1450,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
int numRandomAssignments = 0;
|
||||
int numRetainedAssigments = 0;
|
||||
|
||||
boolean hasRegionReplica = false;
|
||||
for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
|
||||
RegionInfo region = entry.getKey();
|
||||
ServerName oldServerName = entry.getValue();
|
||||
if (!hasRegionReplica && !RegionReplicaUtil.isDefaultReplica(region)) {
|
||||
hasRegionReplica = true;
|
||||
}
|
||||
List<ServerName> localServers = new ArrayList<>();
|
||||
if (oldServerName != null) {
|
||||
localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
|
||||
|
@ -1487,7 +1496,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
// If servers from prior assignment aren't present, then lets do randomAssignment on regions.
|
||||
if (randomAssignRegions.size() > 0) {
|
||||
Cluster cluster = createCluster(servers, regions.keySet());
|
||||
Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica);
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
|
||||
ServerName sn = entry.getKey();
|
||||
for (RegionInfo region : entry.getValue()) {
|
||||
|
@ -1497,7 +1506,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
for (RegionInfo region : randomAssignRegions) {
|
||||
ServerName target = randomAssignment(cluster, region, servers);
|
||||
assignments.get(target).add(region);
|
||||
cluster.doAssignRegion(region, target);
|
||||
numRandomAssignments++;
|
||||
}
|
||||
}
|
||||
|
@ -1548,12 +1556,28 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
ServerName sn = null;
|
||||
final int maxIterations = numServers * 4;
|
||||
int iterations = 0;
|
||||
|
||||
List<ServerName> usedSNs = new ArrayList<>(servers.size());
|
||||
do {
|
||||
int i = RANDOM.nextInt(numServers);
|
||||
sn = servers.get(i);
|
||||
if (!usedSNs.contains(sn)) {
|
||||
usedSNs.add(sn);
|
||||
} while (cluster.wouldLowerAvailability(regionInfo, sn)
|
||||
&& iterations++ < maxIterations);
|
||||
if (iterations >= maxIterations) {
|
||||
// We have reached the max. Means the servers that we collected is still lowering the
|
||||
// availability
|
||||
for (ServerName unusedServer : servers) {
|
||||
if (!usedSNs.contains(unusedServer)) {
|
||||
// check if any other unused server is there for us to use.
|
||||
// If so use it. Else we have not other go but to go with one of them
|
||||
if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
|
||||
sn = unusedServer;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
cluster.doAssignRegion(regionInfo, sn);
|
||||
return sn;
|
||||
}
|
||||
|
|
|
@ -1470,7 +1470,22 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
*/
|
||||
public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
|
||||
throws IOException {
|
||||
return createTable(tableName, families, splitKeys, new Configuration(getConfiguration()));
|
||||
return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a table.
|
||||
* @param tableName the table name
|
||||
* @param families the families
|
||||
* @param splitKeys the splitkeys
|
||||
* @param replicaCount the region replica count
|
||||
* @return A Table instance for the created table.
|
||||
* @throws IOException throws IOException
|
||||
*/
|
||||
public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
|
||||
int replicaCount) throws IOException {
|
||||
return createTable(tableName, families, splitKeys, replicaCount,
|
||||
new Configuration(getConfiguration()));
|
||||
}
|
||||
|
||||
public Table createTable(TableName tableName, byte[][] families,
|
||||
|
@ -1561,16 +1576,19 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
|
||||
/**
|
||||
* Create a table.
|
||||
* @param tableName
|
||||
* @param families
|
||||
* @param splitKeys
|
||||
* @param tableName the table name
|
||||
* @param families the families
|
||||
* @param splitKeys the split keys
|
||||
* @param replicaCount the replica count
|
||||
* @param c Configuration to use
|
||||
* @return A Table instance for the created table.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
|
||||
final Configuration c) throws IOException {
|
||||
return createTable(new HTableDescriptor(tableName), families, splitKeys, c);
|
||||
int replicaCount, final Configuration c) throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.setRegionReplication(replicaCount);
|
||||
return createTable(htd, families, splitKeys, c);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.procedure;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -52,7 +54,7 @@ public class TestServerCrashProcedure {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestServerCrashProcedure.class);
|
||||
|
||||
private HBaseTestingUtility util;
|
||||
protected HBaseTestingUtility util;
|
||||
|
||||
private ProcedureMetrics serverCrashProcMetrics;
|
||||
private long serverCrashSubmittedCount = 0;
|
||||
|
@ -68,13 +70,17 @@ public class TestServerCrashProcedure {
|
|||
public void setup() throws Exception {
|
||||
this.util = new HBaseTestingUtility();
|
||||
setupConf(this.util.getConfiguration());
|
||||
this.util.startMiniCluster(3);
|
||||
startMiniCluster();
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
|
||||
this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false);
|
||||
serverCrashProcMetrics = this.util.getHBaseCluster().getMaster().getMasterMetrics()
|
||||
.getServerCrashProcMetrics();
|
||||
}
|
||||
|
||||
protected void startMiniCluster() throws Exception {
|
||||
this.util.startMiniCluster(3);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
MiniHBaseCluster cluster = this.util.getHBaseCluster();
|
||||
|
@ -113,11 +119,9 @@ public class TestServerCrashProcedure {
|
|||
*/
|
||||
private void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doubleExecution)
|
||||
throws Exception {
|
||||
final TableName tableName = TableName.valueOf(
|
||||
"testRecoveryAndDoubleExecution-carryingMeta-" + carryingMeta);
|
||||
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
|
||||
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
try {
|
||||
final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution-carryingMeta-"
|
||||
+ carryingMeta + "-doubleExecution-" + doubleExecution);
|
||||
try (Table t = createTable(tableName)) {
|
||||
// Load the table with a bit of data so some logs to split and some edits in each region.
|
||||
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
|
||||
final int count = util.countRows(t);
|
||||
|
@ -155,17 +159,25 @@ public class TestServerCrashProcedure {
|
|||
long procId = getSCPProcId(procExec);
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId);
|
||||
}
|
||||
// Assert all data came back.
|
||||
assertReplicaDistributed(t);
|
||||
assertEquals(count, util.countRows(t));
|
||||
assertEquals(checksum, util.checksumRows(t));
|
||||
} catch(Throwable throwable) {
|
||||
} catch (Throwable throwable) {
|
||||
LOG.error("Test failed!", throwable);
|
||||
throw throwable;
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertReplicaDistributed(final Table t) {
|
||||
return;
|
||||
}
|
||||
|
||||
protected Table createTable(final TableName tableName) throws IOException {
|
||||
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
|
||||
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
return t;
|
||||
}
|
||||
|
||||
private void collectMasterMetrics() {
|
||||
serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount();
|
||||
serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount();
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestServerCrashProcedureWithReplicas extends TestServerCrashProcedure {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestServerCrashProcedureWithReplicas.class);
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestServerCrashProcedureWithReplicas.class);
|
||||
|
||||
@Override
|
||||
protected void startMiniCluster() throws Exception {
|
||||
// Start a cluster with 4 nodes because we have 3 replicas.
|
||||
// So on a crash of a server still we can ensure that the
|
||||
// replicas are distributed.
|
||||
this.util.startMiniCluster(4);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Table createTable(final TableName tableName) throws IOException {
|
||||
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
|
||||
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, 3);
|
||||
return t;
|
||||
}
|
||||
|
||||
protected void assertReplicaDistributed(final Table t) {
|
||||
// Assert all data came back.
|
||||
List<RegionInfo> regionInfos = new ArrayList<>();
|
||||
for (RegionServerThread rs : this.util.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
regionInfos.clear();
|
||||
for (Region r : rs.getRegionServer().getRegions(t.getName())) {
|
||||
LOG.info("The region is " + r.getRegionInfo() + " the location is "
|
||||
+ rs.getRegionServer().getServerName());
|
||||
if (contains(regionInfos, r.getRegionInfo())) {
|
||||
LOG.error("Am exiting");
|
||||
fail("Crashed replica regions should not be assigned to same region server");
|
||||
} else {
|
||||
regionInfos.add(r.getRegionInfo());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean contains(List<RegionInfo> regionInfos, RegionInfo regionInfo) {
|
||||
for (RegionInfo info : regionInfos) {
|
||||
if (RegionReplicaUtil.isReplicasForSameRegion(info, regionInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue