HBASE-19326 Remove decommissioned servers from rsgroup

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Guangxu Cheng 2017-12-01 03:48:29 +08:00 committed by Michael Stack
parent c64546aa31
commit cc3f804b07
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
16 changed files with 376 additions and 4 deletions

View File

@ -88,4 +88,14 @@ public interface RSGroupAdmin {
*/
void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
String targetGroup) throws IOException;
/**
* Remove decommissioned servers from rsgroup.
* 1. Sometimes we may find the server aborted due to some hardware failure and we must offline
* the server for repairing. Or we need to move some servers to join other clusters.
* So we need to remove these servers from the rsgroup.
* 2. Dead/recovering/live servers will be disallowed.
* @param servers set of servers to remove
*/
void removeServers(Set<Address> servers) throws IOException;
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServers
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
@ -205,4 +206,23 @@ class RSGroupAdminClient implements RSGroupAdmin {
throw ProtobufUtil.handleRemoteException(e);
}
}
@Override
public void removeServers(Set<Address> servers) throws IOException {
Set<HBaseProtos.ServerName> hostPorts = Sets.newHashSet();
for(Address el: servers) {
hostPorts.add(HBaseProtos.ServerName.newBuilder()
.setHostName(el.getHostname())
.setPort(el.getPort())
.build());
}
RemoveServersRequest request = RemoveServersRequest.newBuilder()
.addAllServers(hostPorts)
.build();
try {
stub.removeServers(null, request);
} catch (ServiceException e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
}

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.rsgroup;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
@ -32,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.SnapshotDescription;
@ -70,6 +73,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesR
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.yetus.audience.InterfaceAudience;
@ -289,6 +294,26 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
}
done.run(builder.build());
}
@Override
public void removeServers(RpcController controller,
RemoveServersRequest request,
RpcCallback<RemoveServersResponse> done) {
RemoveServersResponse.Builder builder =
RemoveServersResponse.newBuilder();
try {
Set<Address> servers = Sets.newHashSet();
for (HBaseProtos.ServerName el : request.getServersList()) {
servers.add(Address.fromParts(el.getHostName(), el.getPort()));
}
LOG.info(master.getClientIdAuditPrefix()
+ " remove decommissioned servers from rsgroup: " + servers);
groupAdminServer.removeServers(servers);
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
done.run(builder.build());
}
}
void assignTableToGroup(TableDescriptor desc) throws IOException {
@ -357,5 +382,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
assignTableToGroup(desc);
}
@Override
public void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, List<ServerName> notClearedServers)
throws IOException {
Set<Address> clearedServer = servers.stream().
filter(server -> !notClearedServers.contains(server)).
map(ServerName::getAddress).
collect(Collectors.toSet());
groupAdminServer.removeServers(clearedServer);
}
/////////////////////////////////////////////////////////////////////////////
}

View File

@ -583,6 +583,29 @@ public class RSGroupAdminServer implements RSGroupAdmin {
+ servers + " , Tables : " + tables + " => " + targetGroup);
}
@Override
public void removeServers(Set<Address> servers) throws IOException {
{
if (servers == null || servers.isEmpty()) {
throw new ConstraintException("The set of servers to remove cannot be null or empty.");
}
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preRemoveServers(servers);
}
//check the set of servers
checkForDeadOrOnlineServers(servers);
rsGroupInfoManager.removeServers(servers);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postRemoveServers(servers);
}
LOG.info("Remove decommissioned servers " + servers + " from rsgroup done.");
}
}
}
private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
throws IOException {
Map<String, RegionState> rit = Maps.newTreeMap();
@ -634,4 +657,33 @@ public class RSGroupAdminServer implements RSGroupAdmin {
return result;
}
/**
* Check if the set of servers are belong to dead servers list or online servers list.
* @param servers servers to remove
*/
private void checkForDeadOrOnlineServers(Set<Address> servers) throws ConstraintException {
// This uglyness is because we only have Address, not ServerName.
Set<Address> onlineServers = new HashSet<>();
for(ServerName server: master.getServerManager().getOnlineServers().keySet()) {
onlineServers.add(server.getAddress());
}
Set<Address> deadServers = new HashSet<>();
for(ServerName server: master.getServerManager().getDeadServers().copyServerNames()) {
deadServers.add(server.getAddress());
}
for (Address address: servers) {
if (onlineServers.contains(address)) {
throw new ConstraintException(
"Server " + address + " is an online server, not allowed to remove.");
}
if (deadServers.contains(address)) {
throw new ConstraintException(
"Server " + address + " is on the dead servers list,"
+ " Maybe it will come back again, not allowed to remove.");
}
}
}
}

View File

@ -117,4 +117,10 @@ public interface RSGroupInfoManager {
*/
void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
String srcGroup, String dstGroup) throws IOException;
/**
* Remove decommissioned servers from rsgroup
* @param servers set of servers to remove
*/
void removeServers(Set<Address> servers) throws IOException;
}

View File

@ -309,6 +309,32 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
flushConfig(newGroupMap);
}
@Override
public synchronized void removeServers(Set<Address> servers) throws IOException {
Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
for (Address el: servers) {
RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
if (rsGroupInfo != null) {
RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
if (newRsGroupInfo == null) {
rsGroupInfo.removeServer(el);
rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
} else {
newRsGroupInfo.removeServer(el);
rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
}
}else {
LOG.warn("Server " + el + " does not belong to any rsgroup.");
}
}
if (rsGroupInfos.size() > 0) {
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.putAll(rsGroupInfos);
flushConfig(newGroupMap);
}
}
List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
for (Result result : rsGroupTable.getScanner(new Scan())) {

View File

@ -115,6 +115,13 @@ message MoveServersAndTablesRequest {
message MoveServersAndTablesResponse {
}
message RemoveServersRequest {
repeated ServerName servers = 1;
}
message RemoveServersResponse {
}
service RSGroupAdminService {
rpc GetRSGroupInfo(GetRSGroupInfoRequest)
returns (GetRSGroupInfoResponse);
@ -145,4 +152,7 @@ service RSGroupAdminService {
rpc MoveServersAndTables(MoveServersAndTablesRequest)
returns (MoveServersAndTablesResponse);
rpc RemoveServers(RemoveServersRequest)
returns (RemoveServersResponse);
}

View File

@ -74,10 +74,10 @@ public class TestRSGroups extends TestRSGroupsBase {
RSGroupBasedLoadBalancer.class.getName());
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
RSGroupAdminEndpoint.class.getName());
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE);
TEST_UTIL.getConfiguration().set(
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setInt(
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
""+NUM_SLAVES_BASE);
NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
admin = TEST_UTIL.getAdmin();

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.EnumSet;
@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
public abstract class TestRSGroupsBase {
protected static final Log LOG = LogFactory.getLog(TestRSGroupsBase.class);
@ -863,4 +866,111 @@ public abstract class TestRSGroupsBase {
//verify that all region still assgin on targetServer
Assert.assertEquals(5, getTableServerRegionMap().get(tableName).get(targetServer).size());
}
}
@Test
public void testClearDeadServers() throws Exception {
LOG.info("testClearDeadServers");
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 3);
ServerName targetServer = ServerName.parseServerName(
newGroup.getServers().iterator().next().toString());
AdminProtos.AdminService.BlockingInterface targetRS =
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
try {
targetServer = ProtobufUtil.toServerName(targetRS.getServerInfo(null,
GetServerInfoRequest.newBuilder().build()).getServerInfo().getServerName());
//stopping may cause an exception
//due to the connection loss
targetRS.stopServer(null,
AdminProtos.StopServerRequest.newBuilder().setReason("Die").build());
} catch(Exception e) {
}
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
//wait for stopped regionserver to dead server list
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !master.getServerManager().areDeadServersInProgress()
&& cluster.getClusterStatus().getDeadServerNames().size() > 0;
}
});
assertFalse(cluster.getClusterStatus().getServers().contains(targetServer));
assertTrue(cluster.getClusterStatus().getDeadServerNames().contains(targetServer));
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
//clear dead servers list
List<ServerName> notClearedServers = admin.clearDeadServers(Lists.newArrayList(targetServer));
assertEquals(0, notClearedServers.size());
Set<Address> newGroupServers = rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getServers();
assertFalse(newGroupServers.contains(targetServer.getAddress()));
assertEquals(2, newGroupServers.size());
}
@Test
public void testRemoveServers() throws Exception {
LOG.info("testRemoveServers");
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 3);
ServerName targetServer = ServerName.parseServerName(
newGroup.getServers().iterator().next().toString());
try {
rsGroupAdmin.removeServers(Sets.newHashSet(targetServer.getAddress()));
fail("Online servers shouldn't have been successfully removed.");
} catch(IOException ex) {
String exp = "Server " + targetServer.getAddress()
+ " is an online server, not allowed to remove.";
String msg = "Expected '" + exp + "' in exception message: ";
assertTrue(msg + " " + ex.getMessage(), ex.getMessage().contains(exp));
}
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
AdminProtos.AdminService.BlockingInterface targetRS =
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
try {
targetServer = ProtobufUtil.toServerName(targetRS.getServerInfo(null,
GetServerInfoRequest.newBuilder().build()).getServerInfo().getServerName());
//stopping may cause an exception
//due to the connection loss
targetRS.stopServer(null,
AdminProtos.StopServerRequest.newBuilder().setReason("Die").build());
} catch(Exception e) {
}
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
//wait for stopped regionserver to dead server list
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !master.getServerManager().areDeadServersInProgress()
&& cluster.getClusterStatus().getDeadServerNames().size() > 0;
}
});
try {
rsGroupAdmin.removeServers(Sets.newHashSet(targetServer.getAddress()));
fail("Dead servers shouldn't have been successfully removed.");
} catch(IOException ex) {
String exp = "Server " + targetServer.getAddress() + " is on the dead servers list,"
+ " Maybe it will come back again, not allowed to remove.";
String msg = "Expected '" + exp + "' in exception message: ";
assertTrue(msg + " " + ex.getMessage(), ex.getMessage().contains(exp));
}
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
ServerName sn = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
TEST_UTIL.getHBaseClusterInterface().stopMaster(sn);
TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(sn, 60000);
TEST_UTIL.getHBaseClusterInterface().startMaster(sn.getHostname(), 0);
TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(60000);
assertEquals(3, cluster.getClusterStatus().getServersSize());
assertFalse(cluster.getClusterStatus().getServers().contains(targetServer));
assertFalse(cluster.getClusterStatus().getDeadServerNames().contains(targetServer));
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
rsGroupAdmin.removeServers(Sets.newHashSet(targetServer.getAddress()));
Set<Address> newGroupServers = rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getServers();
assertFalse(newGroupServers.contains(targetServer.getAddress()));
assertEquals(2, newGroupServers.size());
}
}

View File

@ -110,6 +110,12 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
verify();
}
@Override
public void removeServers(Set<Address> servers) throws IOException {
wrapped.removeServers(servers);
verify();
}
public void verify() throws IOException {
Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
Set<RSGroupInfo> zList = Sets.newHashSet();

View File

@ -1098,6 +1098,24 @@ public interface MasterObserver {
default void postBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String groupName, boolean balancerRan) throws IOException {}
/**
* Called before servers are removed from rsgroup
* @param ctx the environment to interact with the framework and master
* @param servers set of decommissioned servers to remove
*/
default void preRemoveServers(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers) throws IOException {}
/**
* Called after servers are removed from rsgroup
* @param ctx the environment to interact with the framework and master
* @param servers set of servers to remove
*/
default void postRemoveServers(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers) throws IOException {}
/**
* Called before add a replication peer
* @param ctx the environment to interact with the framework and master

View File

@ -1401,6 +1401,30 @@ public class MasterCoprocessorHost
});
}
public void preRemoveServers(final Set<Address> servers)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
if(((MasterEnvironment)getEnvironment()).supportGroupCPs) {
observer.preRemoveServers(this, servers);
}
}
});
}
public void postRemoveServers(final Set<Address> servers)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
if(((MasterEnvironment)getEnvironment()).supportGroupCPs) {
observer.postRemoveServers(this, servers);
}
}
});
}
public void preAddReplicationPeer(final String peerId, final ReplicationPeerConfig peerConfig)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {

View File

@ -2689,6 +2689,12 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
requirePermission(getActiveUser(ctx), "balanceRSGroup", Action.ADMIN);
}
@Override
public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers) throws IOException {
requirePermission(getActiveUser(ctx), "removeServers", Action.ADMIN);
}
@Override
public void preAddReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId, ReplicationPeerConfig peerConfig) throws IOException {

View File

@ -118,5 +118,17 @@ module Hbase
end
@admin.moveServersAndTables(servers, tables, dest)
end
#--------------------------------------------------------------------------
# remove decommissioned server from rsgroup
def remove_servers(*args)
# Flatten params array
args = args.flatten.compact
servers = java.util.HashSet.new
args.each do |s|
servers.add(org.apache.hadoop.hbase.net.Address.fromString(s))
end
@admin.removeServers(servers)
end
end
end

View File

@ -483,5 +483,6 @@ Shell.load_command_group(
move_servers_tables_rsgroup
get_server_rsgroup
get_table_rsgroup
remove_servers_rsgroup
]
)

View File

@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class RemoveServersRsgroup < Command
def help
<<-EOF
Remove decommissioned servers from rsgroup.
Dead/recovering/live servers will be disallowed.
Example:
hbase> remove_servers_rsgroup ['server1:port','server2:port']
EOF
end
def command(servers)
rsgroup_admin.remove_servers(servers)
end
end
end
end