HBASE-17758 [RSGROUP] Add shell command to move servers and tables at the same time (Guangxu Cheng)

This commit is contained in:
Andrew Purtell 2017-03-16 18:37:40 -07:00
parent 8ad3add0d4
commit 7f0e6f1c9e
16 changed files with 2202 additions and 23 deletions

View File

@ -78,4 +78,14 @@ public interface RSGroupAdmin {
* @param hostPort HostPort to get RSGroupInfo for
*/
RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException;
/**
* Move given set of servers and tables to the specified target RegionServer group.
* @param servers set of servers to move
* @param tables set of tables to move
* @param targetGroup the target group name
* @throws IOException
*/
void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
String targetGroup) throws IOException;
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupI
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
@ -183,4 +184,25 @@ class RSGroupAdminClient implements RSGroupAdmin {
throw ProtobufUtil.handleRemoteException(e);
}
}
}
@Override
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup)
throws IOException {
MoveServersAndTablesRequest.Builder builder =
MoveServersAndTablesRequest.newBuilder().setTargetGroup(targetGroup);
for(Address el: servers) {
builder.addServers(HBaseProtos.ServerName.newBuilder()
.setHostName(el.getHostname())
.setPort(el.getPort())
.build());
}
for(TableName tableName: tables) {
builder.addTableName(ProtobufUtil.toProtoTableName(tableName));
}
try {
stub.moveServersAndTables(null, builder.build());
} catch (ServiceException e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
}

View File

@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupI
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest;
@ -243,6 +245,26 @@ public class RSGroupAdminEndpoint implements MasterObserver, CoprocessorService
}
done.run(builder.build());
}
@Override
public void moveServersAndTables(RpcController controller,
MoveServersAndTablesRequest request, RpcCallback<MoveServersAndTablesResponse> done) {
MoveServersAndTablesResponse.Builder builder = MoveServersAndTablesResponse.newBuilder();
try {
Set<Address> hostPorts = Sets.newHashSet();
for (HBaseProtos.ServerName el : request.getServersList()) {
hostPorts.add(Address.fromParts(el.getHostName(), el.getPort()));
}
Set<TableName> tables = new HashSet<>(request.getTableNameList().size());
for (HBaseProtos.TableName tableName : request.getTableNameList()) {
tables.add(ProtobufUtil.toTableName(tableName));
}
groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup());
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
done.run(builder.build());
}
}
/////////////////////////////////////////////////////////////////////////////

View File

@ -138,6 +138,139 @@ public class RSGroupAdminServer implements RSGroupAdmin {
else regions.addFirst(hri);
}
/**
* Check servers and tables.
* Fail if nulls or if servers and tables not belong to the same group
* @param servers servers to move
* @param tables tables to move
* @param targetGroupName target group name
* @throws IOException
*/
private void checkServersAndTables(Set<Address> servers, Set<TableName> tables,
String targetGroupName) throws IOException {
// Presume first server's source group. Later ensure all servers are from this group.
Address firstServer = servers.iterator().next();
RSGroupInfo tmpSrcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer);
if (tmpSrcGrp == null) {
// Be careful. This exception message is tested for in TestRSGroupsBase...
throw new ConstraintException("Source RSGroup for server " + firstServer
+ " does not exist.");
}
RSGroupInfo srcGrp = new RSGroupInfo(tmpSrcGrp);
if (srcGrp.getName().equals(targetGroupName)) {
throw new ConstraintException( "Target RSGroup " + targetGroupName +
" is same as source " + srcGrp.getName() + " RSGroup.");
}
// Only move online servers
checkOnlineServersOnly(servers);
// Ensure all servers are of same rsgroup.
for (Address server: servers) {
String tmpGroup = rsGroupInfoManager.getRSGroupOfServer(server).getName();
if (!tmpGroup.equals(srcGrp.getName())) {
throw new ConstraintException("Move server request should only come from one source " +
"RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup);
}
}
// Ensure all tables and servers are of same rsgroup.
for (TableName table : tables) {
String tmpGroup = rsGroupInfoManager.getRSGroupOfTable(table);
if (!tmpGroup.equals(srcGrp.getName())) {
throw new ConstraintException("Move table request should only come from one source " +
"RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup);
}
}
if (srcGrp.getServers().size() <= servers.size()
&& srcGrp.getTables().size() > tables.size() ) {
throw new ConstraintException("Cannot leave a RSGroup " + srcGrp.getName() +
" that contains tables without servers to host them.");
}
}
/**
* @param servers the servers that will move to new group
* @param targetGroupName the target group name
* @param tables The regions of tables assigned to these servers will not unassign
* @throws IOException
*/
private void unassignRegionFromServers(Set<Address> servers, String targetGroupName,
Set<TableName> tables) throws IOException {
boolean foundRegionsToUnassign;
RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
Set<Address> allSevers = new HashSet<>(servers);
do {
foundRegionsToUnassign = false;
for (Iterator<Address> iter = allSevers.iterator(); iter.hasNext();) {
Address rs = iter.next();
// Get regions that are associated with this server and filter regions by tables.
List<HRegionInfo> regions = new ArrayList<>();
for (HRegionInfo region : getRegions(rs)) {
if (!tables.contains(region.getTable())) {
regions.add(region);
}
}
LOG.info("Unassigning " + regions.size() +
" region(s) from " + rs + " for server move to " + targetGroupName);
if (!regions.isEmpty()) {
for (HRegionInfo region: regions) {
// Regions might get assigned from tables of target group so we need to filter
if (!targetGrp.containsTable(region.getTable())) {
this.master.getAssignmentManager().unassign(region);
if (master.getAssignmentManager().getRegionStates().
getRegionState(region).isFailedOpen()) {
continue;
}
foundRegionsToUnassign = true;
}
}
}
if (!foundRegionsToUnassign) {
iter.remove();
}
}
try {
rsGroupInfoManager.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted", e);
Thread.currentThread().interrupt();
}
} while (foundRegionsToUnassign);
}
/**
* @param tables the tables that will move to new group
* @param targetGroupName the target group name
* @param servers the regions of tables assigned to these servers will not unassign
* @throws IOException
*/
private void unassignRegionFromTables(Set<TableName> tables, String targetGroupName,
Set<Address> servers) throws IOException {
for (TableName table: tables) {
LOG.info("Unassigning region(s) from " + table + " for table move to " + targetGroupName);
LockManager.MasterLock lock = master.getLockManager().createMasterLock(table,
LockProcedure.LockType.EXCLUSIVE, this.getClass().getName() + ": RSGroup: table move");
try {
try {
lock.acquire();
} catch (InterruptedException e) {
throw new IOException("Interrupted when waiting for table lock", e);
}
for (HRegionInfo region :
master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) {
ServerName sn = master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
if (!servers.contains(sn.getAddress())) {
master.getAssignmentManager().unassign(region);
}
}
} finally {
lock.release();
}
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE",
justification="Ignoring complaint because don't know what it is complaining about")
@ -420,6 +553,45 @@ public class RSGroupAdminServer implements RSGroupAdmin {
return rsGroupInfoManager.getRSGroupOfServer(hostPort);
}
@Override
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup)
throws IOException {
if (servers == null || servers.isEmpty() ) {
throw new ConstraintException("The list of servers to move cannot be null or empty.");
}
if (tables == null || tables.isEmpty()) {
throw new ConstraintException("The list of tables to move cannot be null or empty.");
}
//check target group
getAndCheckRSGroupInfo(targetGroup);
// Hold a lock on the manager instance while moving servers and tables to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveServersAndTables(servers, tables, targetGroup);
}
//check servers and tables status
checkServersAndTables(servers, tables, targetGroup);
//Move servers and tables to a new group.
String srcGroup = getRSGroupOfServer(servers.iterator().next()).getName();
rsGroupInfoManager.moveServersAndTables(servers, tables, srcGroup, targetGroup);
//unassign regions which not belong to these tables
unassignRegionFromServers(servers, targetGroup, tables);
//unassign regions which not assigned to these servers
unassignRegionFromTables(tables, targetGroup, servers);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, targetGroup);
}
}
LOG.info("Move servers and tables done. Severs :"
+ servers + " , Tables : " + tables + " => " + targetGroup);
}
private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
throws IOException {
Map<String, RegionState> rit = Maps.newTreeMap();

View File

@ -104,4 +104,14 @@ public interface RSGroupInfoManager {
* @return whether the manager is in online mode
*/
boolean isOnline();
/**
* Move servers and tables to a new group.
* @param servers list of servers, must be part of the same group
* @param tables set of tables to move
* @param srcGroup groupName being moved from
* @param dstGroup groupName being moved to
*/
void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
String srcGroup, String dstGroup) throws IOException;
}

View File

@ -276,6 +276,30 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
return rsGroupStartupWorker.isOnline();
}
@Override
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
String srcGroup, String dstGroup) throws IOException {
//get server's group
RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
//move servers
for (Address el: servers) {
srcGroupInfo.removeServer(el);
dstGroupInfo.addServer(el);
}
//move tables
for(TableName tableName: tables) {
srcGroupInfo.removeTable(tableName);
dstGroupInfo.addTable(tableName);
}
//flush changed groupinfo
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
flushConfig(newGroupMap);
}
List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();

View File

@ -106,6 +106,15 @@ message GetRSGroupInfoOfServerResponse {
optional RSGroupInfo r_s_group_info = 1;
}
message MoveServersAndTablesRequest {
required string target_group = 1;
repeated ServerName servers = 2;
repeated TableName table_name = 3;
}
message MoveServersAndTablesResponse {
}
service RSGroupAdminService {
rpc GetRSGroupInfo(GetRSGroupInfoRequest)
returns (GetRSGroupInfoResponse);
@ -133,4 +142,7 @@ service RSGroupAdminService {
rpc ListRSGroupInfos(ListRSGroupInfosRequest)
returns (ListRSGroupInfosResponse);
rpc MoveServersAndTables(MoveServersAndTablesRequest)
returns (MoveServersAndTablesResponse);
}

View File

@ -691,4 +691,115 @@ public abstract class TestRSGroupsBase {
assertTrue(newGroupTables.contains(tableNameA));
assertTrue(newGroupTables.contains(tableNameB));
}
@Test
public void testMoveServersAndTables() throws Exception {
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
//create table
final byte[] familyNameBytes = Bytes.toBytes("f");
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, 5);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
List<String> regions = getTableRegionMap().get(tableName);
if (regions == null)
return false;
return getTableRegionMap().get(tableName).size() >= 5;
}
});
//get server which is not a member of new group
ServerName targetServer = null;
for(ServerName server : admin.getClusterStatus().getServers()) {
if(!newGroup.containsServer(server.getAddress()) &&
!rsGroupAdmin.getRSGroupInfo("master").containsServer(server.getAddress())) {
targetServer = server;
break;
}
}
//test fail bogus server move
try {
rsGroupAdmin.moveServersAndTables(Sets.newHashSet(Address.fromString("foo:9999")),
Sets.newHashSet(tableName), newGroup.getName());
fail("Bogus servers shouldn't have been successfully moved.");
} catch(IOException ex) {
String exp = "Source RSGroup for server foo:9999 does not exist.";
String msg = "Expected '" + exp + "' in exception message: ";
assertTrue(msg + " " + ex.getMessage(), ex.getMessage().contains(exp));
}
//test fail server move
try {
rsGroupAdmin.moveServersAndTables(Sets.newHashSet(targetServer.getAddress()),
Sets.newHashSet(tableName), RSGroupInfo.DEFAULT_GROUP);
fail("servers shouldn't have been successfully moved.");
} catch(IOException ex) {
String exp = "Target RSGroup " + RSGroupInfo.DEFAULT_GROUP +
" is same as source " + RSGroupInfo.DEFAULT_GROUP + " RSGroup.";
String msg = "Expected '" + exp + "' in exception message: ";
assertTrue(msg + " " + ex.getMessage(), ex.getMessage().contains(exp));
}
//verify default group info
Assert.assertEquals(3,
rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers().size());
Assert.assertEquals(4,
rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getTables().size());
//verify new group info
Assert.assertEquals(1,
rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getServers().size());
Assert.assertEquals(0,
rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getTables().size());
//get all region to move targetServer
List<String> regionList = getTableRegionMap().get(tableName);
for(String region : regionList) {
// Lets move this region to the targetServer
TEST_UTIL.getAdmin().move(Bytes.toBytes(HRegionInfo.encodeRegionName(Bytes.toBytes(region))),
Bytes.toBytes(targetServer.getServerName()));
}
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getTableRegionMap().get(tableName) != null &&
getTableRegionMap().get(tableName).size() == 5 &&
getTableServerRegionMap().get(tableName).size() == 1 &&
admin.getClusterStatus().getRegionsInTransition().size() < 1;
}
});
//verify that all region move to targetServer
Assert.assertEquals(5, getTableServerRegionMap().get(tableName).get(targetServer).size());
//move targetServer and table to newGroup
LOG.info("moving server and table to newGroup");
rsGroupAdmin.moveServersAndTables(Sets.newHashSet(targetServer.getAddress()),
Sets.newHashSet(tableName), newGroup.getName());
//verify group change
Assert.assertEquals(newGroup.getName(),
rsGroupAdmin.getRSGroupInfoOfTable(tableName).getName());
//verify servers' not exist in old group
Set<Address> defaultServers = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers();
assertFalse(defaultServers.contains(targetServer.getAddress()));
//verify servers' exist in new group
Set<Address> newGroupServers = rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getServers();
assertTrue(newGroupServers.contains(targetServer.getAddress()));
//verify tables' not exist in old group
Set<TableName> defaultTables = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getTables();
assertFalse(defaultTables.contains(tableName));
//verify tables' exist in new group
Set<TableName> newGroupTables = rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getTables();
assertTrue(newGroupTables.contains(tableName));
//verify that all region still assgin on targetServer
Assert.assertEquals(5, getTableServerRegionMap().get(tableName).get(targetServer).size());
}
}

View File

@ -103,6 +103,12 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
return wrapped.getRSGroupOfServer(hostPort);
}
@Override
public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
wrapped.moveServersAndTables(servers, tables, targetGroup);
verify();
}
public void verify() throws IOException {
Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
Set<RSGroupInfo> zList = Sets.newHashSet();

View File

@ -1646,6 +1646,24 @@ public interface MasterObserver extends Coprocessor {
final ObserverContext<MasterCoprocessorEnvironment> c,
final HRegionInfo[] regionsToMerge) throws IOException {}
/**
* Called before servers are moved to target region server group
* @param ctx the environment to interact with the framework and master
* @param servers set of servers to move
* @param targetGroup destination group
*/
default void preMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {}
/**
* Called after servers are moved to target region server group
* @param ctx the environment to interact with the framework and master
* @param servers set of servers to move
* @param targetGroup name of group
*/
default void postMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {}
/**
* Called before servers are moved to target region server group
* @param ctx the environment to interact with the framework and master

View File

@ -1511,6 +1511,32 @@ public class MasterCoprocessorHost
return bypass;
}
public void preMoveServersAndTables(final Set<Address> servers, final Set<TableName> tables, final String targetGroup)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver,
ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
if(((MasterEnvironment)ctx.getEnvironment()).supportGroupCPs) {
oserver.preMoveServersAndTables(ctx, servers, tables, targetGroup);
}
}
});
}
public void postMoveServersAndTables(final Set<Address> servers, final Set<TableName> tables, final String targetGroup)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver,
ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
if(((MasterEnvironment)ctx.getEnvironment()).supportGroupCPs) {
oserver.postMoveServersAndTables(ctx, servers, tables, targetGroup);
}
}
});
}
public void preMoveServers(final Set<Address> servers, final String targetGroup)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {

View File

@ -2667,6 +2667,12 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
List<WALEntry> entries, CellScanner cells) throws IOException {
}
@Override
public void preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
requirePermission(getActiveUser(ctx), "moveServersAndTables", Action.ADMIN);
}
@Override
public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, String targetGroup) throws IOException {

View File

@ -1460,6 +1460,16 @@ public class TestMasterObserver {
final String namespace, final Quotas quotas) throws IOException {
}
@Override
public void preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
}
@Override
public void postMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables,String targetGroup) throws IOException {
}
@Override
public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, String targetGroup) throws IOException {

View File

@ -142,5 +142,19 @@ module Hbase
res
end
#--------------------------------------------------------------------------
# move server and table to a group
def move_servers_tables(dest, *args)
servers = java.util.HashSet.new
tables = java.util.HashSet.new;
args[0].each do |s|
servers.add(org.apache.hadoop.hbase.net.Address.fromString(s))
end
args[1].each do |t|
tables.add(org.apache.hadoop.hbase.TableName.valueOf(t))
end
@admin.moveServersAndTables(servers, tables, dest)
end
end
end

View File

@ -470,6 +470,7 @@ Shell.load_command_group(
balance_rsgroup
move_servers_rsgroup
move_tables_rsgroup
move_servers_tables_rsgroup
get_server_rsgroup
get_table_rsgroup
]