HBASE-24900 Make retain assignment configurable during SCP (#2313)

Retain assignment will be useful in non-cloud scenario where RegionServer and Datanode are deployed in same machine and will avoid remote read.

Signed-off-by: Guanghao Zhang <zghao@apache.org> 
Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org>
This commit is contained in:
Pankaj 2021-02-01 10:37:57 +05:30 committed by GitHub
parent 1765ff7f76
commit a04ea7ea44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 300 additions and 10 deletions

View File

@ -414,13 +414,8 @@ public class TransitRegionStateProcedure
// Should be called with RegionStateNode locked
public void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName) throws IOException {
// force to assign to a new candidate server
// AssignmentManager#regionClosedAbnormally will set region location to null
// TODO: the forceNewPlan flag not be persistent so if master crash then the flag will be lost.
// But assign to old server is not big deal because it not effect correctness.
// See HBASE-23035 for more details.
forceNewPlan = true;
ServerName serverName, boolean forceNewPlan) throws IOException {
this.forceNewPlan = forceNewPlan;
if (remoteProc != null) {
// this means we are waiting for the sub procedure, so wake it up
remoteProc.serverCrashed(env, regionNode, serverName);

View File

@ -65,6 +65,21 @@ public class ServerCrashProcedure
implements ServerProcedureInterface {
private static final Logger LOG = LoggerFactory.getLogger(ServerCrashProcedure.class);
/**
* Configuration parameter to enable/disable the retain region assignment during
* ServerCrashProcedure.
* <p>
* By default retain assignment is disabled which makes the failover faster and improve the
* availability; useful for cloud scenario where region block locality is not important. Enable
* this when RegionServers are deployed on same host where Datanode are running, this will improve
* read performance due to local read.
* <p>
* see HBASE-24900 for more details.
*/
public static final String MASTER_SCP_RETAIN_ASSIGNMENT = "hbase.master.scp.retain.assignment";
/** Default value of {@link #MASTER_SCP_RETAIN_ASSIGNMENT} */
public static final boolean DEFAULT_MASTER_SCP_RETAIN_ASSIGNMENT = false;
/**
* Name of the crashed server to process.
*/
@ -486,6 +501,8 @@ public class ServerCrashProcedure
*/
private void assignRegions(MasterProcedureEnv env, List<RegionInfo> regions) throws IOException {
AssignmentManager am = env.getMasterServices().getAssignmentManager();
boolean retainAssignment = env.getMasterConfiguration().getBoolean(MASTER_SCP_RETAIN_ASSIGNMENT,
DEFAULT_MASTER_SCP_RETAIN_ASSIGNMENT);
for (RegionInfo region : regions) {
RegionStateNode regionNode = am.getRegionStates().getOrCreateRegionStateNode(region);
regionNode.lock();
@ -512,7 +529,8 @@ public class ServerCrashProcedure
}
if (regionNode.getProcedure() != null) {
LOG.info("{} found RIT {}; {}", this, regionNode.getProcedure(), regionNode);
regionNode.getProcedure().serverCrashed(env, regionNode, getServerName());
regionNode.getProcedure().serverCrashed(env, regionNode, getServerName(),
!retainAssignment);
continue;
}
if (env.getMasterServices().getTableStateManager()
@ -531,9 +549,8 @@ public class ServerCrashProcedure
LOG.warn("Found table disabled for region {}, procDetails: {}", regionNode, this);
continue;
}
// force to assign to a new candidate server, see HBASE-23035 for more details.
TransitRegionStateProcedure proc =
TransitRegionStateProcedure.assign(env, region, true, null);
TransitRegionStateProcedure.assign(env, region, !retainAssignment, null);
regionNode.setProcedure(proc);
addChildProcedure(proc);
} finally {

View File

@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ MasterTests.class, MediumTests.class })
public class TestRetainAssignmentOnRestart extends AbstractTestRestartCluster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRetainAssignmentOnRestart.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRetainAssignmentOnRestart.class);
private static int NUM_OF_RS = 3;
@Override
protected boolean splitWALCoordinatedByZk() {
return true;
}
/**
* This tests retaining assignments on a cluster restart
*/
@Test
public void testRetainAssignmentOnClusterRestart() throws Exception {
setupCluster();
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
assertEquals(NUM_OF_RS, threads.size());
int[] rsPorts = new int[NUM_OF_RS];
for (int i = 0; i < NUM_OF_RS; i++) {
rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
}
// We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
// use it to load all user region placements
SnapshotOfRegionAssignmentFromMeta snapshot =
new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
for (ServerName serverName : regionToRegionServerMap.values()) {
boolean found = false; // Test only, no need to optimize
for (int k = 0; k < NUM_OF_RS && !found; k++) {
found = serverName.getPort() == rsPorts[k];
}
assertTrue(found);
}
LOG.info("\n\nShutting down HBase cluster");
cluster.stopMaster(0);
cluster.shutdown();
cluster.waitUntilShutDown();
LOG.info("\n\nSleeping a bit");
Thread.sleep(2000);
LOG.info("\n\nStarting cluster the second time with the same ports");
cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
master = cluster.startMaster().getMaster();
for (int i = 0; i < NUM_OF_RS; i++) {
cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]);
cluster.startRegionServer();
}
ensureServersWithSamePort(master, rsPorts);
// Wait till master is initialized and all regions are assigned
for (TableName TABLE : TABLES) {
UTIL.waitTableAvailable(TABLE);
}
UTIL.waitUntilNoRegionsInTransition(60000);
snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
ServerName currentServer = entry.getValue();
LOG.info(
"Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
}
}
/**
* This tests retaining assignments on a single node restart
*/
@Test
public void testRetainAssignmentOnSingleRSRestart() throws Exception {
setupCluster();
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
assertEquals(NUM_OF_RS, threads.size());
int[] rsPorts = new int[NUM_OF_RS];
for (int i = 0; i < NUM_OF_RS; i++) {
rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
}
// We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
// use it to load all user region placements
SnapshotOfRegionAssignmentFromMeta snapshot =
new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
for (ServerName serverName : regionToRegionServerMap.values()) {
boolean found = false; // Test only, no need to optimize
for (int k = 0; k < NUM_OF_RS && !found; k++) {
found = serverName.getPort() == rsPorts[k];
}
assertTrue(found);
}
// Server to be restarted
ServerName deadRS = threads.get(0).getRegionServer().getServerName();
LOG.info("\n\nStopping HMaster and {} server", deadRS);
// Stopping master first so that region server SCP will not be initiated
cluster.stopMaster(0);
cluster.waitForMasterToStop(master.getServerName(), 5000);
cluster.stopRegionServer(deadRS);
LOG.info("\n\nSleeping a bit");
Thread.sleep(2000);
LOG.info("\n\nStarting HMaster and region server {} second time with the same port", deadRS);
cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
master = cluster.startMaster().getMaster();
cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
cluster.startRegionServer();
ensureServersWithSamePort(master, rsPorts);
// Wait till master is initialized and all regions are assigned
for (TableName TABLE : TABLES) {
UTIL.waitTableAvailable(TABLE);
}
UTIL.waitUntilNoRegionsInTransition(60000);
snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
ServerName currentServer = entry.getValue();
LOG.info(
"Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
if (deadRS.getPort() == oldServer.getPort()) {
// Restarted RS start code wont be same
assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
} else {
assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
}
}
}
private void setupCluster() throws Exception, IOException, InterruptedException {
// Set Zookeeper based connection registry since we will stop master and start a new master
// without populating the underlying config for the connection.
UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
// Enable retain assignment during ServerCrashProcedure
UTIL.getConfiguration().setBoolean(ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT, true);
UTIL.startMiniCluster(NUM_OF_RS);
// Turn off balancer
UTIL.getMiniHBaseCluster().getMaster().getMasterRpcServices().synchronousBalanceSwitch(false);
LOG.info("\n\nCreating tables");
for (TableName TABLE : TABLES) {
UTIL.createTable(TABLE, FAMILY);
}
for (TableName TABLE : TABLES) {
UTIL.waitTableEnabled(TABLE);
}
UTIL.getMiniHBaseCluster().getMaster();
UTIL.waitUntilNoRegionsInTransition(60000);
}
private void ensureServersWithSamePort(HMaster master, int[] rsPorts) {
// Make sure live regionservers are on the same host/port
List<ServerName> localServers = master.getServerManager().getOnlineServersList();
assertEquals(NUM_OF_RS, localServers.size());
for (int i = 0; i < NUM_OF_RS; i++) {
boolean found = false;
for (ServerName serverName : localServers) {
if (serverName.getPort() == rsPorts[i]) {
found = true;
break;
}
}
assertTrue(found);
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestRetainAssignmentOnRestartSplitWithoutZk
extends TestRetainAssignmentOnRestart {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRetainAssignmentOnRestartSplitWithoutZk.class);
@Override
protected boolean splitWALCoordinatedByZk() {
return false;
}
}