HBASE-17654 RSGroup refactoring.

Changes contain:
- Making rsGroupInfoManager non-static in RSGroupAdminEndpoint
- Encapsulate RSGroupAdminService into an internal class in RSGroupAdminEndpoint (on need of inheritence).
- Change two internal classes in RSGroupAdminServer to non-static (so outer classes' variables can be shared).
- Rename RSGroupSerDe to RSGroupProtobufUtil('ProtobufUtil' is what we use in other places). Moved 2 functions to RSGroupManagerImpl because they are only used there.
- Javadoc comments
- Improving variable names
- Maybe other misc refactoring

Change-Id: I09f0f5aa413150390c91795b8a8fd5e6cdd6c416
This commit is contained in:
Apekshit Sharma 2017-02-16 02:00:37 -08:00
parent f4bf375ea1
commit ce64e7eb6e
17 changed files with 658 additions and 866 deletions

View File

@ -34,15 +34,14 @@ import org.apache.hadoop.hbase.net.Address;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class RSGroupInfo {
public static final String DEFAULT_GROUP = "default";
public static final String NAMESPACEDESC_PROP_GROUP = "hbase.rsgroup.name";
public static final String NAMESPACE_DESC_PROP_GROUP = "hbase.rsgroup.name";
private String name;
private final String name;
// Keep servers in a sorted set so has an expected ordering when displayed.
private SortedSet<Address> servers;
private final SortedSet<Address> servers;
// Keep tables sorted too.
private SortedSet<TableName> tables;
private final SortedSet<TableName> tables;
public RSGroupInfo(String name) {
this(name, new TreeSet<Address>(), new TreeSet<TableName>());
@ -50,7 +49,7 @@ public class RSGroupInfo {
RSGroupInfo(String name, SortedSet<Address> servers, SortedSet<TableName> tables) {
this.name = name;
this.servers = servers == null? new TreeSet<Address>(): servers;
this.servers = servers == null? new TreeSet<>(): servers;
this.servers.addAll(servers);
this.tables = new TreeSet<>(tables);
}
@ -61,26 +60,20 @@ public class RSGroupInfo {
/**
* Get group name.
*
* @return group name
*/
public String getName() {
return name;
}
/**
* Adds the server to the group.
*
* @param hostPort the server
* Adds the given server to the group.
*/
public void addServer(Address hostPort){
servers.add(hostPort);
}
/**
* Adds a group of servers.
*
* @param hostPort the servers
* Adds the given servers to the group.
*/
public void addAllServers(Collection<Address> hostPort){
servers.addAll(hostPort);
@ -96,25 +89,20 @@ public class RSGroupInfo {
/**
* Get list of servers.
*
* @return set of servers
*/
public Set<Address> getServers() {
return servers;
}
/**
* Remove a server from this group.
*
* @param hostPort HostPort of the server to remove
* Remove given server from the group.
*/
public boolean removeServer(Address hostPort) {
return servers.remove(hostPort);
}
/**
* Set of tables that are members of this group
* @return set of tables
* Get set of tables that are members of the group.
*/
public SortedSet<TableName> getTables() {
return tables;

View File

@ -29,20 +29,18 @@ import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
/**
* Runs all of the units tests defined in TestGroupBase
* as an integration test.
* Runs all of the units tests defined in TestGroupBase as an integration test.
* Requires TestRSGroupBase.NUM_SLAVE_BASE servers to run.
*/
@Category(IntegrationTests.class)
public class IntegrationTestRSGroup extends TestRSGroupsBase {
//Integration specific
private final static Log LOG = LogFactory.getLog(IntegrationTestRSGroup.class);
private static boolean initialized = false;
@BeforeClass
public void beforeMethod() throws Exception {
if(!initialized) {
LOG.info("Setting up IntegrationTestGroup");
LOG.info("Setting up IntegrationTestRSGroup");
LOG.info("Initializing cluster with " + NUM_SLAVES_BASE + " servers");
TEST_UTIL = new IntegrationTestingUtility();
((IntegrationTestingUtility)TEST_UTIL).initializeCluster(NUM_SLAVES_BASE);

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.rsgroup;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -32,76 +31,51 @@ import org.apache.hadoop.hbase.net.Address;
@InterfaceAudience.Private
public interface RSGroupAdmin {
/**
* Gets the regionserver group information.
*
* @param groupName the group name
* @return An instance of RSGroupInfo
* Gets {@code RSGroupInfo} for given group name.
*/
RSGroupInfo getRSGroupInfo(String groupName) throws IOException;
/**
* Gets the regionserver group info of table.
*
* @param tableName the table name
* @return An instance of RSGroupInfo.
* Gets {@code RSGroupInfo} for the given table's group.
*/
RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException;
/**
* Move a set of serves to another group
*
*
* @param servers set of servers, must be in the form HOST:PORT
* @param targetGroup the target group
* @throws java.io.IOException Signals that an I/O exception has occurred.
* Move given set of servers to the specified target RegionServer group.
*/
void moveServers(Set<Address> servers, String targetGroup)
throws IOException;
void moveServers(Set<Address> servers, String targetGroup) throws IOException;
/**
* Move tables to a new group.
* Move given set of tables to the specified target RegionServer group.
* This will unassign all of a table's region so it can be reassigned to the correct group.
* @param tables list of tables to move
* @param targetGroup target group
* @throws java.io.IOException on failure to move tables
*/
void moveTables(Set<TableName> tables, String targetGroup) throws IOException;
/**
* Add a new group
* @param name name of the group
* @throws java.io.IOException on failure to add group
* Creates a new RegionServer group with the given name.
*/
void addRSGroup(String name) throws IOException;
void addRSGroup(String groupName) throws IOException;
/**
* Remove a regionserver group
* @param name name of the group
* @throws java.io.IOException on failure to remove group
* Removes RegionServer group associated with the given name.
*/
void removeRSGroup(String name) throws IOException;
void removeRSGroup(String groupName) throws IOException;
/**
* Balance the regions in a group
* Balance regions in the given RegionServer group.
*
* @param name the name of the group to balance
* @return boolean whether balance ran or not
* @throws java.io.IOException on unexpected failure to balance group
* @return boolean Whether balance ran or not
*/
boolean balanceRSGroup(String name) throws IOException;
boolean balanceRSGroup(String groupName) throws IOException;
/**
* Lists the existing groups.
*
* @return Collection of RSGroupInfo.
* Lists current set of RegionServer groups.
*/
List<RSGroupInfo> listRSGroups() throws IOException;
/**
* Retrieve the RSGroupInfo a server is affiliated to
* @param hostPort HostPort to get RSGroupInfo for
* @return RSGroupInfo associated with the server
* @throws java.io.IOException on unexpected failure to retrieve GroupInfo
*/
RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException;
}

View File

@ -28,36 +28,42 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
/**
* Client used for managing region server group information.
*/
@InterfaceAudience.Private
class RSGroupAdminClient implements RSGroupAdmin {
private RSGroupAdminProtos.RSGroupAdminService.BlockingInterface stub;
private RSGroupAdminService.BlockingInterface stub;
public RSGroupAdminClient(Connection conn) throws IOException {
stub = RSGroupAdminProtos.RSGroupAdminService.newBlockingStub(
conn.getAdmin().coprocessorService());
stub = RSGroupAdminService.newBlockingStub(conn.getAdmin().coprocessorService());
}
@Override
public RSGroupInfo getRSGroupInfo(String groupName) throws IOException {
try {
RSGroupAdminProtos.GetRSGroupInfoResponse resp =
stub.getRSGroupInfo(null,
RSGroupAdminProtos.GetRSGroupInfoRequest.newBuilder()
.setRSGroupName(groupName).build());
GetRSGroupInfoResponse resp = stub.getRSGroupInfo(null,
GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build());
if(resp.hasRSGroupInfo()) {
return RSGroupSerDe.toGroupInfo(resp.getRSGroupInfo());
return RSGroupProtobufUtil.toGroupInfo(resp.getRSGroupInfo());
}
return null;
} catch (ServiceException e) {
@ -67,14 +73,12 @@ class RSGroupAdminClient implements RSGroupAdmin {
@Override
public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
RSGroupAdminProtos.GetRSGroupInfoOfTableRequest request =
RSGroupAdminProtos.GetRSGroupInfoOfTableRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
GetRSGroupInfoOfTableRequest request = GetRSGroupInfoOfTableRequest.newBuilder().setTableName(
ProtobufUtil.toProtoTableName(tableName)).build();
try {
GetRSGroupInfoOfTableResponse resp = stub.getRSGroupInfoOfTable(null, request);
if (resp.hasRSGroupInfo()) {
return RSGroupSerDe.toGroupInfo(resp.getRSGroupInfo());
return RSGroupProtobufUtil.toGroupInfo(resp.getRSGroupInfo());
}
return null;
} catch (ServiceException e) {
@ -91,11 +95,10 @@ class RSGroupAdminClient implements RSGroupAdmin {
.setPort(el.getPort())
.build());
}
RSGroupAdminProtos.MoveServersRequest request =
RSGroupAdminProtos.MoveServersRequest.newBuilder()
MoveServersRequest request = MoveServersRequest.newBuilder()
.setTargetGroup(targetGroup)
.addAllServers(hostPorts).build();
.addAllServers(hostPorts)
.build();
try {
stub.moveServers(null, request);
} catch (ServiceException e) {
@ -105,9 +108,7 @@ class RSGroupAdminClient implements RSGroupAdmin {
@Override
public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
RSGroupAdminProtos.MoveTablesRequest.Builder builder =
RSGroupAdminProtos.MoveTablesRequest.newBuilder()
.setTargetGroup(targetGroup);
MoveTablesRequest.Builder builder = MoveTablesRequest.newBuilder().setTargetGroup(targetGroup);
for(TableName tableName: tables) {
builder.addTableName(ProtobufUtil.toProtoTableName(tableName));
}
@ -120,9 +121,7 @@ class RSGroupAdminClient implements RSGroupAdmin {
@Override
public void addRSGroup(String groupName) throws IOException {
RSGroupAdminProtos.AddRSGroupRequest request =
RSGroupAdminProtos.AddRSGroupRequest.newBuilder()
.setRSGroupName(groupName).build();
AddRSGroupRequest request = AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build();
try {
stub.addRSGroup(null, request);
} catch (ServiceException e) {
@ -132,9 +131,7 @@ class RSGroupAdminClient implements RSGroupAdmin {
@Override
public void removeRSGroup(String name) throws IOException {
RSGroupAdminProtos.RemoveRSGroupRequest request =
RSGroupAdminProtos.RemoveRSGroupRequest.newBuilder()
.setRSGroupName(name).build();
RemoveRSGroupRequest request = RemoveRSGroupRequest.newBuilder().setRSGroupName(name).build();
try {
stub.removeRSGroup(null, request);
} catch (ServiceException e) {
@ -143,11 +140,9 @@ class RSGroupAdminClient implements RSGroupAdmin {
}
@Override
public boolean balanceRSGroup(String name) throws IOException {
RSGroupAdminProtos.BalanceRSGroupRequest request =
RSGroupAdminProtos.BalanceRSGroupRequest.newBuilder()
.setRSGroupName(name).build();
public boolean balanceRSGroup(String groupName) throws IOException {
BalanceRSGroupRequest request = BalanceRSGroupRequest.newBuilder()
.setRSGroupName(groupName).build();
try {
return stub.balanceRSGroup(null, request).getBalanceRan();
} catch (ServiceException e) {
@ -158,12 +153,11 @@ class RSGroupAdminClient implements RSGroupAdmin {
@Override
public List<RSGroupInfo> listRSGroups() throws IOException {
try {
List<RSGroupProtos.RSGroupInfo> resp =
stub.listRSGroupInfos(null,
RSGroupAdminProtos.ListRSGroupInfosRequest.newBuilder().build()).getRSGroupInfoList();
List<RSGroupInfo> result = new ArrayList<RSGroupInfo>(resp.size());
for(RSGroupProtos.RSGroupInfo entry: resp) {
result.add(RSGroupSerDe.toGroupInfo(entry));
List<RSGroupProtos.RSGroupInfo> resp = stub.listRSGroupInfos(null,
ListRSGroupInfosRequest.getDefaultInstance()).getRSGroupInfoList();
List<RSGroupInfo> result = new ArrayList<>(resp.size());
for(RSGroupProtos.RSGroupInfo entry : resp) {
result.add(RSGroupProtobufUtil.toGroupInfo(entry));
}
return result;
} catch (ServiceException e) {
@ -173,8 +167,7 @@ class RSGroupAdminClient implements RSGroupAdmin {
@Override
public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException {
RSGroupAdminProtos.GetRSGroupInfoOfServerRequest request =
RSGroupAdminProtos.GetRSGroupInfoOfServerRequest.newBuilder()
GetRSGroupInfoOfServerRequest request = GetRSGroupInfoOfServerRequest.newBuilder()
.setServer(HBaseProtos.ServerName.newBuilder()
.setHostName(hostPort.getHostname())
.setPort(hostPort.getPort())
@ -183,7 +176,7 @@ class RSGroupAdminClient implements RSGroupAdmin {
try {
GetRSGroupInfoOfServerResponse resp = stub.getRSGroupInfoOfServer(null, request);
if (resp.hasRSGroupInfo()) {
return RSGroupSerDe.toGroupInfo(resp.getRSGroupInfo());
return RSGroupProtobufUtil.toGroupInfo(resp.getRSGroupInfo());
}
return null;
} catch (ServiceException e) {

View File

@ -20,22 +20,21 @@ package org.apache.hadoop.hbase.rsgroup;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.Coprocessor;
import com.google.common.collect.Sets;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
@ -43,12 +42,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.locking.LockProcedure.LockType;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos;
@ -71,75 +65,61 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesR
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import com.google.common.collect.Sets;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@InterfaceAudience.Private
public class RSGroupAdminEndpoint extends RSGroupAdminService implements CoprocessorService,
Coprocessor, MasterObserver {
private MasterServices master = null;
public class RSGroupAdminEndpoint implements MasterObserver, CoprocessorService {
private static final Log LOG = LogFactory.getLog(RSGroupAdminEndpoint.class);
// TODO: Static? Fix.
private static RSGroupInfoManager groupInfoManager;
private MasterServices master = null;
// Only instance of RSGroupInfoManager. RSGroup aware load balancers ask for this instance on
// their setup.
private RSGroupInfoManager groupInfoManager;
private RSGroupAdminServer groupAdminServer;
private final RSGroupAdminService groupAdminService = new RSGroupAdminServiceImpl();
@Override
public void start(CoprocessorEnvironment env) throws IOException {
MasterCoprocessorEnvironment menv = (MasterCoprocessorEnvironment)env;
master = menv.getMasterServices();
setGroupInfoManager(new RSGroupInfoManagerImpl(master));
master = ((MasterCoprocessorEnvironment)env).getMasterServices();
groupInfoManager = RSGroupInfoManagerImpl.getInstance(master);
groupAdminServer = new RSGroupAdminServer(master, groupInfoManager);
Class<?> clazz =
master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null);
if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) {
throw new IOException("Configured balancer is not a GroupableBalancer");
throw new IOException("Configured balancer does not support RegionServer groups.");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
@Override
public Service getService() {
return this;
}
private static void setStaticGroupInfoManager(RSGroupInfoManagerImpl groupInfoManager) {
RSGroupAdminEndpoint.groupInfoManager = groupInfoManager;
}
private void setGroupInfoManager(RSGroupInfoManagerImpl groupInfoManager) throws IOException {
if (groupInfoManager == null) {
groupInfoManager = new RSGroupInfoManagerImpl(master);
groupInfoManager.init();
} else if (!groupInfoManager.isInit()) {
groupInfoManager.init();
}
setStaticGroupInfoManager(groupInfoManager);
return groupAdminService;
}
RSGroupInfoManager getGroupInfoManager() {
return groupInfoManager;
}
/**
* Implementation of RSGroupAdminService defined in RSGroupAdmin.proto.
* This class calls {@link RSGroupAdminServer} for actual work, converts result to protocol
* buffer response, handles exceptions if any occurred and then calls the {@code RpcCallback} with
* the response.
* Since our CoprocessorHost asks the Coprocessor for a Service
* ({@link CoprocessorService#getService()}) instead of doing "coproc instanceOf Service"
* and requiring Coprocessor itself to be Service (something we do with our observers),
* we can use composition instead of inheritance here. That makes it easy to manage
* functionalities in concise classes (sometimes inner classes) instead of single class doing
* many different things.
*/
private class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService {
@Override
public void getRSGroupInfo(RpcController controller,
GetRSGroupInfoRequest request,
RpcCallback<GetRSGroupInfoResponse> done) {
GetRSGroupInfoResponse.Builder builder =
GetRSGroupInfoResponse.newBuilder();
GetRSGroupInfoRequest request, RpcCallback<GetRSGroupInfoResponse> done) {
GetRSGroupInfoResponse.Builder builder = GetRSGroupInfoResponse.newBuilder();
String groupName = request.getRSGroupName();
try {
RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
if (rsGroupInfo != null) {
builder.setRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(rsGroupInfo));
builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(rsGroupInfo));
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
@ -149,15 +129,13 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
@Override
public void getRSGroupInfoOfTable(RpcController controller,
GetRSGroupInfoOfTableRequest request,
RpcCallback<GetRSGroupInfoOfTableResponse> done) {
GetRSGroupInfoOfTableResponse.Builder builder =
GetRSGroupInfoOfTableResponse.newBuilder();
GetRSGroupInfoOfTableRequest request, RpcCallback<GetRSGroupInfoOfTableResponse> done) {
GetRSGroupInfoOfTableResponse.Builder builder = GetRSGroupInfoOfTableResponse.newBuilder();
try {
TableName tableName = ProtobufUtil.toTableName(request.getTableName());
RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName);
if (RSGroupInfo != null) {
builder.setRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(RSGroupInfo));
builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
@ -166,14 +144,12 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
}
@Override
public void moveServers(RpcController controller,
MoveServersRequest request,
public void moveServers(RpcController controller, MoveServersRequest request,
RpcCallback<MoveServersResponse> done) {
RSGroupAdminProtos.MoveServersResponse.Builder builder =
RSGroupAdminProtos.MoveServersResponse.newBuilder();
MoveServersResponse.Builder builder = MoveServersResponse.newBuilder();
try {
Set<Address> hostPorts = Sets.newHashSet();
for(HBaseProtos.ServerName el: request.getServersList()) {
for (HBaseProtos.ServerName el : request.getServersList()) {
hostPorts.add(Address.fromParts(el.getHostName(), el.getPort()));
}
groupAdminServer.moveServers(hostPorts, request.getTargetGroup());
@ -184,14 +160,12 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
}
@Override
public void moveTables(RpcController controller,
MoveTablesRequest request,
public void moveTables(RpcController controller, MoveTablesRequest request,
RpcCallback<MoveTablesResponse> done) {
MoveTablesResponse.Builder builder =
MoveTablesResponse.newBuilder();
MoveTablesResponse.Builder builder = MoveTablesResponse.newBuilder();
try {
Set<TableName> tables = new HashSet<TableName>(request.getTableNameList().size());
for(HBaseProtos.TableName tableName: request.getTableNameList()) {
Set<TableName> tables = new HashSet<>(request.getTableNameList().size());
for (HBaseProtos.TableName tableName : request.getTableNameList()) {
tables.add(ProtobufUtil.toTableName(tableName));
}
groupAdminServer.moveTables(tables, request.getTargetGroup());
@ -202,11 +176,9 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
}
@Override
public void addRSGroup(RpcController controller,
AddRSGroupRequest request,
public void addRSGroup(RpcController controller, AddRSGroupRequest request,
RpcCallback<AddRSGroupResponse> done) {
AddRSGroupResponse.Builder builder =
AddRSGroupResponse.newBuilder();
AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder();
try {
groupAdminServer.addRSGroup(request.getRSGroupName());
} catch (IOException e) {
@ -217,8 +189,7 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
@Override
public void removeRSGroup(RpcController controller,
RemoveRSGroupRequest request,
RpcCallback<RemoveRSGroupResponse> done) {
RemoveRSGroupRequest request, RpcCallback<RemoveRSGroupResponse> done) {
RemoveRSGroupResponse.Builder builder =
RemoveRSGroupResponse.newBuilder();
try {
@ -231,8 +202,7 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
@Override
public void balanceRSGroup(RpcController controller,
BalanceRSGroupRequest request,
RpcCallback<BalanceRSGroupResponse> done) {
BalanceRSGroupRequest request, RpcCallback<BalanceRSGroupResponse> done) {
BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder();
try {
builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName()));
@ -245,13 +215,11 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
@Override
public void listRSGroupInfos(RpcController controller,
ListRSGroupInfosRequest request,
RpcCallback<ListRSGroupInfosResponse> done) {
ListRSGroupInfosResponse.Builder builder =
ListRSGroupInfosResponse.newBuilder();
ListRSGroupInfosRequest request, RpcCallback<ListRSGroupInfosResponse> done) {
ListRSGroupInfosResponse.Builder builder = ListRSGroupInfosResponse.newBuilder();
try {
for(RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) {
builder.addRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(RSGroupInfo));
for (RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) {
builder.addRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
@ -261,38 +229,67 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
@Override
public void getRSGroupInfoOfServer(RpcController controller,
GetRSGroupInfoOfServerRequest request,
RpcCallback<GetRSGroupInfoOfServerResponse> done) {
GetRSGroupInfoOfServerRequest request, RpcCallback<GetRSGroupInfoOfServerResponse> done) {
GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder();
try {
Address hp =
Address.fromParts(request.getServer().getHostName(), request.getServer().getPort());
Address hp = Address.fromParts(request.getServer().getHostName(),
request.getServer().getPort());
RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(hp);
if (RSGroupInfo != null) {
builder.setRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(RSGroupInfo));
builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
done.run(builder.build());
}
}
/////////////////////////////////////////////////////////////////////////////
// MasterObserver overrides
/////////////////////////////////////////////////////////////////////////////
// Assign table to default RSGroup.
@Override
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
groupAdminServer.prepareRSGroupForTable(desc);
String groupName =
master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString())
.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
if (groupName == null) {
groupName = RSGroupInfo.DEFAULT_GROUP;
}
RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
if (rsGroupInfo == null) {
throw new ConstraintException("Default RSGroup (" + groupName + ") for this table's "
+ "namespace does not exist.");
}
if (!rsGroupInfo.containsTable(desc.getTableName())) {
LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName);
groupAdminServer.moveTables(Sets.newHashSet(desc.getTableName()), groupName);
}
}
// Remove table from its RSGroup.
@Override
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
groupAdminServer.cleanupRSGroupForTable(tableName);
try {
RSGroupInfo group = groupAdminServer.getRSGroupInfoOfTable(tableName);
if (group != null) {
LOG.debug(String.format("Removing deleted table '%s' from rsgroup '%s'", tableName,
group.getName()));
groupAdminServer.moveTables(Sets.newHashSet(tableName), null);
}
} catch (IOException ex) {
LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex);
}
}
@Override
public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
NamespaceDescriptor ns) throws IOException {
String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP);
String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
if(group != null && groupAdminServer.getRSGroupInfo(group) == null) {
throw new ConstraintException("Region server group "+group+" does not exit");
}
@ -303,4 +300,5 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce
NamespaceDescriptor ns) throws IOException {
preCreateNamespace(ctx, ns);
}
/////////////////////////////////////////////////////////////////////////////
}

View File

@ -32,7 +32,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.net.Address;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Service to support Region Server Grouping (HBase-6721).
@ -62,23 +60,23 @@ public class RSGroupAdminServer implements RSGroupAdmin {
private MasterServices master;
private final RSGroupInfoManager rsGroupInfoManager;
public RSGroupAdminServer(MasterServices master,
RSGroupInfoManager RSGroupInfoManager) throws IOException {
public RSGroupAdminServer(MasterServices master, RSGroupInfoManager rsGroupInfoManager)
throws IOException {
this.master = master;
this.rsGroupInfoManager = RSGroupInfoManager;
this.rsGroupInfoManager = rsGroupInfoManager;
}
@Override
public RSGroupInfo getRSGroupInfo(String groupName) throws IOException {
return getRSGroupInfoManager().getRSGroup(groupName);
return rsGroupInfoManager.getRSGroup(groupName);
}
@Override
public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
// We are reading across two Maps in the below with out synchronizing across
// them; should be safe most of the time.
String groupName = getRSGroupInfoManager().getRSGroupOfTable(tableName);
return groupName == null? null: getRSGroupInfoManager().getRSGroup(groupName);
String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
return groupName == null? null: rsGroupInfoManager.getRSGroup(groupName);
}
private void checkOnlineServersOnly(Set<Address> servers) throws ConstraintException {
@ -99,18 +97,17 @@ public class RSGroupAdminServer implements RSGroupAdmin {
/**
* Check passed name. Fail if nulls or if corresponding RSGroupInfo not found.
* @return The RSGroupInfo named <code>name</code>
* @throws IOException
*/
private RSGroupInfo getAndCheckRSGroupInfo(String name)
throws IOException {
if (StringUtils.isEmpty(name)) {
throw new ConstraintException("RSGroup cannot be null.");
}
RSGroupInfo rsgi = getRSGroupInfo(name);
if (rsgi == null) {
RSGroupInfo rsGroupInfo = getRSGroupInfo(name);
if (rsGroupInfo == null) {
throw new ConstraintException("RSGroup does not exist: " + name);
}
return rsgi;
return rsGroupInfo;
}
/**
@ -141,7 +138,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
else regions.addFirst(hri);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE",
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE",
justification="Ignoring complaint because don't know what it is complaining about")
@Override
public void moveServers(Set<Address> servers, String targetGroupName)
@ -155,19 +153,20 @@ public class RSGroupAdminServer implements RSGroupAdmin {
return;
}
RSGroupInfo targetGrp = getAndCheckRSGroupInfo(targetGroupName);
RSGroupInfoManager manager = getRSGroupInfoManager();
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (manager) {
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName);
}
// Presume first server's source group. Later ensure all servers are from this group.
Address firstServer = servers.iterator().next();
RSGroupInfo srcGrp = manager.getRSGroupOfServer(firstServer);
RSGroupInfo srcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer);
if (srcGrp == null) {
// Be careful. This exception message is tested for in TestRSGroupsBase...
throw new ConstraintException("Source RSGroup for server " + firstServer + " does not exist.");
throw new ConstraintException("Source RSGroup for server " + firstServer
+ " does not exist.");
}
if (srcGrp.getName().equals(targetGroupName)) {
throw new ConstraintException( "Target RSGroup " + targetGroupName +
@ -180,7 +179,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
// Ensure all servers are of same rsgroup.
for (Address server: servers) {
String tmpGroup = manager.getRSGroupOfServer(server).getName();
String tmpGroup = rsGroupInfoManager.getRSGroupOfServer(server).getName();
if (!tmpGroup.equals(srcGrp.getName())) {
throw new ConstraintException("Move server request should only come from one source " +
"RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup);
@ -192,7 +191,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
// MovedServers may be < passed in 'servers'.
Set<Address> movedServers = manager.moveServers(servers, srcGrp.getName(), targetGroupName);
Set<Address> movedServers = rsGroupInfoManager.moveServers(servers, srcGrp.getName(),
targetGroupName);
List<Address> editableMovedServers = Lists.newArrayList(movedServers);
boolean foundRegionsToUnassign;
do {
@ -231,7 +231,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
}
try {
manager.wait(1000);
rsGroupInfoManager.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted", e);
Thread.currentThread().interrupt();
@ -247,22 +247,21 @@ public class RSGroupAdminServer implements RSGroupAdmin {
@Override
public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
if (tables == null) {
throw new ConstraintException(
"The list of servers cannot be null.");
throw new ConstraintException("The list of servers cannot be null.");
}
if (tables.size() < 1) {
LOG.debug("moveTables() passed an empty set. Ignoring.");
return;
}
RSGroupInfoManager manager = getRSGroupInfoManager();
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (manager) {
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup);
}
if(targetGroup != null) {
RSGroupInfo destGroup = manager.getRSGroup(targetGroup);
RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup);
if(destGroup == null) {
throw new ConstraintException("Target " + targetGroup + " RSGroup does not exist.");
}
@ -272,14 +271,14 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
for (TableName table : tables) {
String srcGroup = manager.getRSGroupOfTable(table);
String srcGroup = rsGroupInfoManager.getRSGroupOfTable(table);
if(srcGroup != null && srcGroup.equals(targetGroup)) {
throw new ConstraintException(
"Source RSGroup " + srcGroup + " is same as target " + targetGroup +
" RSGroup for table " + table);
}
}
manager.moveTables(tables, targetGroup);
rsGroupInfoManager.moveTables(tables, targetGroup);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup);
}
@ -308,7 +307,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preAddRSGroup(name);
}
getRSGroupInfoManager().addRSGroup(new RSGroupInfo(name));
rsGroupInfoManager.addRSGroup(new RSGroupInfo(name));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postAddRSGroup(name);
}
@ -316,37 +315,36 @@ public class RSGroupAdminServer implements RSGroupAdmin {
@Override
public void removeRSGroup(String name) throws IOException {
RSGroupInfoManager manager = getRSGroupInfoManager();
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (manager) {
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preRemoveRSGroup(name);
}
RSGroupInfo rsgi = manager.getRSGroup(name);
if (rsgi == null) {
RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(name);
if (rsGroupInfo == null) {
throw new ConstraintException("RSGroup " + name + " does not exist");
}
int tableCount = rsgi.getTables().size();
int tableCount = rsGroupInfo.getTables().size();
if (tableCount > 0) {
throw new ConstraintException("RSGroup " + name + " has " + tableCount +
" tables; you must remove these tables from the rsgroup before " +
"the rsgroup can be removed.");
}
int serverCount = rsgi.getServers().size();
int serverCount = rsGroupInfo.getServers().size();
if (serverCount > 0) {
throw new ConstraintException("RSGroup " + name + " has " + serverCount +
" servers; you must remove these servers from the RSGroup before" +
"the RSGroup can be removed.");
}
for (NamespaceDescriptor ns: master.getClusterSchema().getNamespaces()) {
String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP);
String nsGroup = ns.getConfigurationValue(rsGroupInfo.NAMESPACE_DESC_PROP_GROUP);
if (nsGroup != null && nsGroup.equals(name)) {
throw new ConstraintException("RSGroup " + name + " is referenced by namespace: " +
ns.getName());
}
}
manager.removeRSGroup(name);
rsGroupInfoManager.removeRSGroup(name);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postRemoveRSGroup(name);
}
@ -370,9 +368,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Only allow one balance run at at time.
Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
if (groupRIT.size() > 0) {
LOG.debug("Not running balancer because " +
groupRIT.size() +
" region(s) in transition: " +
LOG.debug("Not running balancer because " + groupRIT.size() + " region(s) in transition: " +
StringUtils.abbreviate(
master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(),
256));
@ -388,9 +384,10 @@ public class RSGroupAdminServer implements RSGroupAdmin {
List<RegionPlan> plans = new ArrayList<RegionPlan>();
for(Map.Entry<TableName, Map<ServerName, List<HRegionInfo>>> tableMap:
getRSGroupAssignmentsByTable(groupName).entrySet()) {
LOG.info("Creating partial plan for table "+tableMap.getKey()+": "+tableMap.getValue());
LOG.info("Creating partial plan for table " + tableMap.getKey() + ": "
+ tableMap.getValue());
List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue());
LOG.info("Partial plan for table "+tableMap.getKey()+": "+partialPlans);
LOG.info("Partial plan for table " + tableMap.getKey() + ": " + partialPlans);
if (partialPlans != null) {
plans.addAll(partialPlans);
}
@ -398,13 +395,13 @@ public class RSGroupAdminServer implements RSGroupAdmin {
long startTime = System.currentTimeMillis();
balancerRan = plans != null;
if (plans != null && !plans.isEmpty()) {
LOG.info("RSGroup balance "+groupName+" starting with plan count: "+plans.size());
LOG.info("RSGroup balance " + groupName + " starting with plan count: " + plans.size());
for (RegionPlan plan: plans) {
LOG.info("balance " + plan);
assignmentManager.balance(plan);
}
LOG.info("RSGroup balance "+groupName+" completed after "+
(System.currentTimeMillis()-startTime)+" seconds");
LOG.info("RSGroup balance " + groupName + " completed after " +
(System.currentTimeMillis()-startTime) + " seconds");
}
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan);
@ -415,27 +412,21 @@ public class RSGroupAdminServer implements RSGroupAdmin {
@Override
public List<RSGroupInfo> listRSGroups() throws IOException {
return getRSGroupInfoManager().listRSGroups();
return rsGroupInfoManager.listRSGroups();
}
@Override
public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException {
return getRSGroupInfoManager().getRSGroupOfServer(hostPort);
}
private RSGroupInfoManager getRSGroupInfoManager() throws IOException {
return rsGroupInfoManager;
return rsGroupInfoManager.getRSGroupOfServer(hostPort);
}
private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
throws IOException {
Map<String, RegionState> rit = Maps.newTreeMap();
AssignmentManager am = master.getAssignmentManager();
RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName);
for(TableName tableName : RSGroupInfo.getTables()) {
for(TableName tableName : getRSGroupInfo(groupName).getTables()) {
for(HRegionInfo regionInfo: am.getRegionStates().getRegionsOfTable(tableName)) {
RegionState state =
master.getAssignmentManager().getRegionStates().getRegionTransitionState(regionInfo);
RegionState state = am.getRegionStates().getRegionTransitionState(regionInfo);
if(state != null) {
rit.put(regionInfo.getEncodedName(), state);
}
@ -447,72 +438,37 @@ public class RSGroupAdminServer implements RSGroupAdmin {
private Map<TableName, Map<ServerName, List<HRegionInfo>>>
getRSGroupAssignmentsByTable(String groupName) throws IOException {
Map<TableName, Map<ServerName, List<HRegionInfo>>> result = Maps.newHashMap();
RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName);
RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
Map<TableName, Map<ServerName, List<HRegionInfo>>> assignments = Maps.newHashMap();
for(Map.Entry<HRegionInfo, ServerName> entry:
master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) {
TableName currTable = entry.getKey().getTable();
ServerName currServer = entry.getValue();
HRegionInfo currRegion = entry.getKey();
if(RSGroupInfo.getTables().contains(currTable)) {
if(!assignments.containsKey(entry.getKey().getTable())) {
assignments.put(currTable, new HashMap<ServerName, List<HRegionInfo>>());
}
if(!assignments.get(currTable).containsKey(currServer)) {
assignments.get(currTable).put(currServer, new ArrayList<HRegionInfo>());
}
if (rsGroupInfo.getTables().contains(currTable)) {
assignments.putIfAbsent(currTable, new HashMap<>());
assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>());
assignments.get(currTable).get(currServer).add(currRegion);
}
}
Map<ServerName, List<HRegionInfo>> serverMap = Maps.newHashMap();
for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) {
if(RSGroupInfo.getServers().contains(serverName.getAddress())) {
if(rsGroupInfo.getServers().contains(serverName.getAddress())) {
serverMap.put(serverName, Collections.emptyList());
}
}
//add all tables that are members of the group
for(TableName tableName : RSGroupInfo.getTables()) {
// add all tables that are members of the group
for(TableName tableName : rsGroupInfo.getTables()) {
if(assignments.containsKey(tableName)) {
result.put(tableName, new HashMap<ServerName, List<HRegionInfo>>());
result.put(tableName, new HashMap<>());
result.get(tableName).putAll(serverMap);
result.get(tableName).putAll(assignments.get(tableName));
LOG.debug("Adding assignments for "+tableName+": "+assignments.get(tableName));
LOG.debug("Adding assignments for " + tableName + ": " + assignments.get(tableName));
}
}
return result;
}
public void prepareRSGroupForTable(HTableDescriptor desc) throws IOException {
String groupName =
master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString())
.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP);
if (groupName == null) {
groupName = RSGroupInfo.DEFAULT_GROUP;
}
RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName);
if (RSGroupInfo == null) {
throw new ConstraintException("RSGroup " + groupName + " does not exist.");
}
if (!RSGroupInfo.containsTable(desc.getTableName())) {
LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName);
moveTables(Sets.newHashSet(desc.getTableName()), groupName);
}
}
public void cleanupRSGroupForTable(TableName tableName) throws IOException {
try {
RSGroupInfo group = getRSGroupInfoOfTable(tableName);
if (group != null) {
LOG.debug("Removing deleted table from table rsgroup " + group.getName());
moveTables(Sets.newHashSet(tableName), null);
}
} catch (ConstraintException ex) {
LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex);
} catch (IOException ex) {
LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex);
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
@ -38,7 +39,6 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseIOException;
@ -70,10 +70,7 @@ import org.apache.hadoop.util.ReflectionUtils;
*
*/
@InterfaceAudience.Private
public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalancer {
/** Config for pluggable load balancers */
public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class";
public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
private static final Log LOG = LogFactory.getLog(RSGroupBasedLoadBalancer.class);
private Configuration config;
@ -82,16 +79,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
private volatile RSGroupInfoManager rsGroupInfoManager;
private LoadBalancer internalBalancer;
//used during reflection by LoadBalancerFactory
/**
* Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
*/
@InterfaceAudience.Private
public RSGroupBasedLoadBalancer() {}
//This constructor should only be used for unit testing
@InterfaceAudience.Private
public RSGroupBasedLoadBalancer(RSGroupInfoManager rsGroupInfoManager) {
this.rsGroupInfoManager = rsGroupInfoManager;
}
@Override
public Configuration getConf() {
return config;
@ -112,11 +105,6 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
this.masterServices = masterServices;
}
@Override
public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
}
@Override
public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>>
clusterState) throws HBaseIOException {
@ -384,8 +372,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
}
// Create the balancer
Class<? extends LoadBalancer> balancerKlass = config.getClass(
HBASE_GROUP_LOADBALANCER_CLASS,
Class<? extends LoadBalancer> balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
StochasticLoadBalancer.class, LoadBalancer.class);
internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
internalBalancer.setMasterServices(masterServices);
@ -399,6 +386,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
return this.rsGroupInfoManager.isOnline();
}
@Override
public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad) {
}
@Override
public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
}
@ -420,4 +411,9 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
public boolean isStopped() {
return false;
}
@VisibleForTesting
public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
this.rsGroupInfoManager = rsGroupInfoManager;
}
}

View File

@ -37,28 +37,21 @@ import org.apache.hadoop.hbase.net.Address;
@InterfaceAudience.Private
public interface RSGroupInfoManager {
//Assigned before user tables
public static final TableName RSGROUP_TABLE_NAME =
TableName RSGROUP_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
public static final byte[] RSGROUP_TABLE_NAME_BYTES = RSGROUP_TABLE_NAME.toBytes();
public static final String rsGroupZNode = "rsgroup";
public static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
public static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");
public static final byte[] ROW_KEY = {0};
byte[] RSGROUP_TABLE_NAME_BYTES = RSGROUP_TABLE_NAME.toBytes();
String rsGroupZNode = "rsgroup";
byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");
byte[] ROW_KEY = {0};
/**
* Adds the group.
*
* @param rsGroupInfo the group name
* @throws java.io.IOException Signals that an I/O exception has occurred.
* Add given RSGroupInfo to existing list of group infos.
*/
void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException;
/**
* Remove a region server group.
*
* @param groupName the group name
* @throws java.io.IOException Signals that an I/O exception has occurred.
*/
void removeRSGroup(String groupName) throws IOException;
@ -68,32 +61,22 @@ public interface RSGroupInfoManager {
* @param srcGroup groupName being moved from
* @param dstGroup groupName being moved to
* @return Set of servers moved (May be a subset of {@code servers}).
* @throws java.io.IOException on move failure
*/
Set<Address> moveServers(Set<Address> servers,
String srcGroup, String dstGroup) throws IOException;
Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
throws IOException;
/**
* Gets the group info of server.
*
* @param hostPort the server
* @return An instance of RSGroupInfo
*/
RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException;
RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException;
/**
* Gets the group information.
*
* @param groupName the group name
* @return An instance of RSGroupInfo
* Gets {@code RSGroupInfo} for the given group name.
*/
RSGroupInfo getRSGroup(String groupName) throws IOException;
/**
* Get the group membership of a table
* @param tableName name of table to get group membership
* @return Group name of table
* @throws java.io.IOException on failure to retrive information
*/
String getRSGroupOfTable(TableName tableName) throws IOException;
@ -102,29 +85,21 @@ public interface RSGroupInfoManager {
*
* @param tableNames set of tables to move
* @param groupName name of group of tables to move to
* @throws java.io.IOException on failure to move
*/
void moveTables(Set<TableName> tableNames, String groupName) throws IOException;
/**
* List the groups
*
* @return list of RSGroupInfo
* @throws java.io.IOException on failure
* List the existing {@code RSGroupInfo}s.
*/
List<RSGroupInfo> listRSGroups() throws IOException;
/**
* Refresh/reload the group information from
* the persistent store
*
* @throws java.io.IOException on failure to refresh
* Refresh/reload the group information from the persistent store
*/
void refresh() throws IOException;
/**
* Whether the manager is able to fully
* return group metadata
* Whether the manager is able to fully return group metadata
*
* @return whether the manager is in online mode
*/

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.rsgroup;
import static org.apache.hadoop.hbase.rsgroup.Utility.getOnlineServers;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -57,10 +57,12 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
@ -92,11 +94,11 @@ import com.google.protobuf.ServiceException;
*
* <h2>Concurrency</h2>
* RSGroup state is kept locally in Maps. There is a rsgroup name to cached
* RSGroupInfo Map at this.rsGroupMap and a Map of tables to the name of the
* rsgroup they belong too (in this.tableMap). These Maps are persisted to the
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the
* rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the
* hbase:rsgroup table (and cached in zk) on each modification.
*
* <p>Mutations on state are synchronized but so reads can continue without having
* <p>Mutations on state are synchronized but reads can continue without having
* to wait on an instance monitor, mutations do wholesale replace of the Maps on
* update -- Copy-On-Write; the local Maps of state are read-only, just-in-case
* (see flushConfig).
@ -109,7 +111,7 @@ import com.google.protobuf.ServiceException;
* no other has access concurrently. Reads must be able to continue concurrently.
*/
@InterfaceAudience.Private
public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener {
class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class);
/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
@ -132,43 +134,35 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
private volatile Map<TableName, String> tableMap = Collections.emptyMap();
private final MasterServices master;
private final MasterServices masterServices;
private Table rsGroupTable;
private final ClusterConnection conn;
private final ZooKeeperWatcher watcher;
private RSGroupStartupWorker rsGroupStartupWorker;
private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker();
// contains list of groups that were last flushed to persistent store
private Set<String> prevRSGroups = new HashSet<String>();
private final RSGroupSerDe rsGroupSerDe = new RSGroupSerDe();
private DefaultServerUpdater defaultServerUpdater;
private boolean init = false;
private Set<String> prevRSGroups = new HashSet<>();
private final ServerEventsListenerThread serverEventsListenerThread =
new ServerEventsListenerThread();
public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
this.master = master;
this.watcher = master.getZooKeeper();
this.conn = master.getClusterConnection();
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
this.watcher = masterServices.getZooKeeper();
this.conn = masterServices.getClusterConnection();
}
public synchronized void init() throws IOException{
if (this.init) return;
rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn);
private synchronized void init() throws IOException{
refresh();
rsGroupStartupWorker.start();
defaultServerUpdater = new DefaultServerUpdater(this);
master.getServerManager().registerListener(this);
defaultServerUpdater.start();
this.init = true;
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
}
synchronized boolean isInit() {
return init;
static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
instance.init();
return instance;
}
/**
* Adds the group.
*
* @param rsGroupInfo the group name
*/
@Override
public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
checkGroupName(rsGroupInfo.getName());
@ -182,29 +176,23 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
RSGroupInfo rsgi = null;
try {
rsgi = getRSGroup(groupName);
} catch (IOException ioe) {
// Will never happen
throw new DoNotRetryIOException(ioe);
}
if (rsgi == null) {
RSGroupInfo rsGroupInfo = getRSGroup(groupName);
if (rsGroupInfo == null) {
throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
}
return rsgi;
return rsGroupInfo;
}
@Override
public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
throws IOException {
public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
String dstGroup) throws IOException {
RSGroupInfo src = getRSGroupInfo(srcGroup);
RSGroupInfo dst = getRSGroupInfo(dstGroup);
// If destination is 'default' rsgroup, only add servers that are online. If not online, drop it.
// If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a rsgroup
// of dead servers that are to come back later).
// If destination is 'default' rsgroup, only add servers that are online. If not online, drop
// it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
// rsgroup of dead servers that are to come back later).
Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)?
getOnlineServers(this.master): null;
Utility.getOnlineServers(this.masterServices): null;
for (Address el: servers) {
src.removeServer(el);
if (onlineServers != null) {
@ -224,45 +212,29 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
return dst.getServers();
}
/**
* Gets the group info of server.
*
* @param hostPort the server
* @return An instance of GroupInfo.
*/
@Override
public RSGroupInfo getRSGroupOfServer(Address hostPort)
throws IOException {
public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
for (RSGroupInfo info: rsGroupMap.values()) {
if (info.containsServer(hostPort)) {
if (info.containsServer(serverHostPort)) {
return info;
}
}
return null;
}
/**
* Gets the group information.
*
* @param groupName
* the group name
* @return An instance of GroupInfo
*/
@Override
public RSGroupInfo getRSGroup(String groupName) throws IOException {
return this.rsGroupMap.get(groupName);
public RSGroupInfo getRSGroup(String groupName) {
return rsGroupMap.get(groupName);
}
@Override
public String getRSGroupOfTable(TableName tableName) throws IOException {
public String getRSGroupOfTable(TableName tableName) {
return tableMap.get(tableName);
}
@Override
public synchronized void moveTables(
Set<TableName> tableNames, String groupName) throws IOException {
public synchronized void moveTables(Set<TableName> tableNames, String groupName)
throws IOException {
if (groupName != null && !rsGroupMap.containsKey(groupName)) {
throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
}
@ -280,21 +252,14 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
newGroupMap.put(dst.getName(), dst);
}
}
flushConfig(newGroupMap);
}
/**
* Delete a region server group.
*
* @param groupName the group name
* @throws java.io.IOException Signals that an I/O exception has occurred.
*/
@Override
public synchronized void removeRSGroup(String groupName) throws IOException {
if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group");
throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved "
+ "group");
}
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.remove(groupName);
@ -302,7 +267,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
@Override
public List<RSGroupInfo> listRSGroups() throws IOException {
public List<RSGroupInfo> listRSGroups() {
return Lists.newLinkedList(rsGroupMap.values());
}
@ -311,6 +276,41 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
return rsGroupStartupWorker.isOnline();
}
List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
for (Result result : rsGroupTable.getScanner(new Scan())) {
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(
result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
}
return rsGroupInfoList;
}
List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode);
List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
//Overwrite any info stored by table, this takes precedence
try {
if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) {
byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode));
if(data.length > 0) {
ProtobufUtil.expectPBMagicPrefix(data);
ByteArrayInputStream bis = new ByteArrayInputStream(
data, ProtobufUtil.lengthOfPBMagic(), data.length);
RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(
RSGroupProtos.RSGroupInfo.parseFrom(bis)));
}
}
LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
}
} catch (KeeperException|DeserializationException|InterruptedException e) {
throw new IOException("Failed to read rsGroupZNode",e);
}
return RSGroupInfoList;
}
@Override
public void refresh() throws IOException {
refresh(false);
@ -319,11 +319,9 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
/**
* Read rsgroup info from the source of truth, the hbase:rsgroup table.
* Update zk cache. Called on startup of the manager.
* @param forceOnline
* @throws IOException
*/
private synchronized void refresh(boolean forceOnline) throws IOException {
List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>();
List<RSGroupInfo> groupList = new LinkedList<>();
// Overwrite anything read from zk, group table is source of truth
// if online read from GROUP table
@ -332,29 +330,25 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
if (rsGroupTable == null) {
rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
}
groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable));
groupList.addAll(retrieveGroupListFromGroupTable());
} else {
LOG.debug("Refreshing in Offline mode.");
String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode);
groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath));
groupList.addAll(retrieveGroupListFromZookeeper());
}
// refresh default group, prune
NavigableSet<TableName> orphanTables = new TreeSet<TableName>();
for(String entry: master.getTableDescriptors().getAll().keySet()) {
NavigableSet<TableName> orphanTables = new TreeSet<>();
for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
orphanTables.add(TableName.valueOf(entry));
}
List<TableName> specialTables;
if(!master.isInitialized()) {
specialTables = new ArrayList<TableName>(4);
specialTables.add(AccessControlLists.ACL_TABLE_NAME);
specialTables.add(TableName.META_TABLE_NAME);
specialTables.add(TableName.NAMESPACE_TABLE_NAME);
specialTables.add(RSGROUP_TABLE_NAME);
final List<TableName> specialTables;
if(!masterServices.isInitialized()) {
specialTables = Arrays.asList(AccessControlLists.ACL_TABLE_NAME, TableName.META_TABLE_NAME,
TableName.NAMESPACE_TABLE_NAME, RSGROUP_TABLE_NAME);
} else {
specialTables =
master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
masterServices.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
}
for (TableName table : specialTables) {
@ -380,26 +374,26 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
newTableMap.put(table, group.getName());
}
}
installNewMaps(newGroupMap, newTableMap);
resetRSGroupAndTableMaps(newGroupMap, newTableMap);
updateCacheOfRSGroups(rsGroupMap.keySet());
}
private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap)
private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> groupMap)
throws IOException {
Map<TableName,String> newTableMap = Maps.newHashMap();
List<Mutation> mutations = Lists.newArrayList();
// populate deletes
for(String groupName : prevRSGroups) {
if(!newGroupMap.containsKey(groupName)) {
if(!groupMap.containsKey(groupName)) {
Delete d = new Delete(Bytes.toBytes(groupName));
mutations.add(d);
}
}
// populate puts
for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
RSGroupProtos.RSGroupInfo proto = RSGroupSerDe.toProtoGroupInfo(RSGroupInfo);
for(RSGroupInfo RSGroupInfo : groupMap.values()) {
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
mutations.add(p);
@ -441,13 +435,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
newTableMap = flushConfigTable(newGroupMap);
// Make changes visible after having been persisted to the source of truth
installNewMaps(newGroupMap, newTableMap);
resetRSGroupAndTableMaps(newGroupMap, newTableMap);
try {
String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode);
ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size());
List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
for(String groupName : prevRSGroups) {
if(!newGroupMap.containsKey(groupName)) {
String znode = ZKUtil.joinZNode(groupBasePath, groupName);
@ -458,7 +452,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName());
RSGroupProtos.RSGroupInfo proto = RSGroupSerDe.toProtoGroupInfo(RSGroupInfo);
RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
LOG.debug("Updating znode: "+znode);
ZKUtil.createAndFailSilent(watcher, znode);
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
@ -470,7 +464,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
ZKUtil.multiOrSequential(watcher, zkOps, false);
} catch (KeeperException e) {
LOG.error("Failed to write to rsGroupZNode", e);
master.abort("Failed to write to rsGroupZNode", e);
masterServices.abort("Failed to write to rsGroupZNode", e);
throw new IOException("Failed to write to rsGroupZNode",e);
}
updateCacheOfRSGroups(newGroupMap.keySet());
@ -480,7 +474,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
* Make changes visible.
* Caller must be synchronized on 'this'.
*/
private void installNewMaps(Map<String, RSGroupInfo> newRSGroupMap,
private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
Map<TableName, String> newTableMap) {
// Make maps Immutable.
this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
@ -499,22 +493,22 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
// Called by getDefaultServers. Presume it has lock in place.
private List<ServerName> getOnlineRS() throws IOException {
if (master != null) {
return master.getServerManager().getOnlineServersList();
if (masterServices != null) {
return masterServices.getServerManager().getOnlineServersList();
}
try {
LOG.debug("Reading online RS from zookeeper");
List<ServerName> servers = new LinkedList<ServerName>();
List<ServerName> servers = new LinkedList<>();
try {
for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.rsZNode)) {
servers.add(ServerName.parseServerName(el));
}
return servers;
} catch (KeeperException e) {
throw new IOException("Failed to retrieve server list from zookeeper", e);
}
return servers;
}
// Called by DefaultServerUpdater. Presume it has lock on this manager when it runs.
// Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
private SortedSet<Address> getDefaultServers() throws IOException {
SortedSet<Address> defaultServers = Sets.newTreeSet();
for (ServerName serverName : getOnlineRS()) {
@ -535,7 +529,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
return defaultServers;
}
// Called by DefaultServerUpdater. Synchronize on this because redoing
// Called by ServerEventsListenerThread. Synchronize on this because redoing
// the rsGroupMap then writing it out.
private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
@ -545,42 +539,47 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
flushConfig(newGroupMap);
}
/**
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
* servers. Notifications about server changes are received by registering {@link ServerListener}.
* As a listener, we need to return immediately, so the real work of updating the servers is
* done asynchronously in this thread.
*/
private class ServerEventsListenerThread extends Thread implements ServerListener {
private final Log LOG = LogFactory.getLog(ServerEventsListenerThread.class);
private boolean changed = false;
ServerEventsListenerThread() {
setDaemon(true);
}
@Override
public void serverAdded(ServerName serverName) {
// #serverChanged is internally synchronized
defaultServerUpdater.serverChanged();
serverChanged();
}
@Override
public void serverRemoved(ServerName serverName) {
// #serverChanged is internally synchronized
defaultServerUpdater.serverChanged();
serverChanged();
}
// TODO: Why do we need this extra thread? Why can't we just go
// fetch at balance time or admin time?
private static class DefaultServerUpdater extends Thread {
private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
private final RSGroupInfoManagerImpl mgr;
private boolean changed = false;
public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
super("RSGroup.ServerUpdater");
setDaemon(true);
this.mgr = mgr;
private synchronized void serverChanged() {
changed = true;
this.notify();
}
@Override
public void run() {
setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
SortedSet<Address> prevDefaultServers = new TreeSet<>();
while(isMasterRunning(this.mgr.master)) {
while(isMasterRunning(masterServices)) {
try {
LOG.info("Updating default servers.");
SortedSet<Address> servers = mgr.getDefaultServers();
SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
if (!servers.equals(prevDefaultServers)) {
mgr.updateDefaultServers(servers);
RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
prevDefaultServers = servers;
LOG.info("Updated with servers: " + servers.size());
LOG.info("Updated with servers: "+servers.size());
}
try {
synchronized (this) {
@ -597,46 +596,31 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
}
}
public void serverChanged() {
synchronized (this) {
changed = true;
this.notify();
}
}
}
private static class RSGroupStartupWorker extends Thread {
private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
private class RSGroupStartupWorker extends Thread {
private final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
private volatile boolean online = false;
private final MasterServices masterServices;
private final RSGroupInfoManagerImpl groupInfoManager;
private final ClusterConnection conn;
public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager,
MasterServices masterServices,
ClusterConnection conn) {
this.masterServices = masterServices;
this.groupInfoManager = groupInfoManager;
this.conn = conn;
setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName());
RSGroupStartupWorker() {
setDaemon(true);
}
@Override
public void run() {
setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
if (waitForGroupTableOnline()) {
LOG.info("GroupBasedLoadBalancer is now online");
}
}
public boolean waitForGroupTableOnline() {
final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>();
final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>();
private boolean waitForGroupTableOnline() {
final List<HRegionInfo> foundRegions = new LinkedList<>();
final List<HRegionInfo> assignedRegions = new LinkedList<>();
final AtomicBoolean found = new AtomicBoolean(false);
final TableStateManager tsm = masterServices.getTableStateManager();
boolean createSent = false;
while (!found.get() && isMasterRunning(this.masterServices)) {
while (!found.get() && isMasterRunning(masterServices)) {
foundRegions.clear();
assignedRegions.clear();
found.set(true);
@ -703,7 +687,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
MetaTableAccessor.fullScanRegions(conn, visitor);
// if no regions in meta then we have to create the table
if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
groupInfoManager.createRSGroupTable(masterServices);
createRSGroupTable();
createSent = true;
}
LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get()
@ -717,10 +701,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
if (found.get()) {
LOG.debug("With group table online, refreshing cached information.");
groupInfoManager.refresh(true);
RSGroupInfoManagerImpl.this.refresh(true);
online = true;
//flush any inconsistencies between ZK and HTable
groupInfoManager.flushConfig();
RSGroupInfoManagerImpl.this.flushConfig();
}
} catch (RuntimeException e) {
throw e;
@ -737,16 +721,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
return found.get();
}
public boolean isOnline() {
return online;
}
}
private static boolean isMasterRunning(MasterServices masterServices) {
return !masterServices.isAborted() && !masterServices.isStopped();
}
private void createRSGroupTable(MasterServices masterServices) throws IOException {
private void createRSGroupTable() throws IOException {
Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
// wait for region to be online
int tries = 600;
@ -756,11 +731,11 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new IOException("Wait interrupted", e);
throw new IOException("Wait interrupted ", e);
}
tries--;
}
if (tries <= 0) {
if(tries <= 0) {
throw new IOException("Failed to create group table in a given time.");
} else {
ProcedureInfo result = masterServices.getMasterProcedureExecutor().getResult(procId);
@ -770,8 +745,16 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
}
private void multiMutate(List<Mutation> mutations)
throws IOException {
public boolean isOnline() {
return online;
}
}
private static boolean isMasterRunning(MasterServices masterServices) {
return !masterServices.isAborted() && !masterServices.isStopped();
}
private void multiMutate(List<Mutation> mutations) throws IOException {
CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
= MultiRowMutationProtos.MutateRowsRequest.newBuilder();

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
@InterfaceAudience.Private
class RSGroupProtobufUtil {
static RSGroupInfo toGroupInfo(RSGroupProtos.RSGroupInfo proto) {
RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName());
for(HBaseProtos.ServerName el: proto.getServersList()) {
RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort()));
}
for(HBaseProtos.TableName pTableName: proto.getTablesList()) {
RSGroupInfo.addTable(ProtobufUtil.toTableName(pTableName));
}
return RSGroupInfo;
}
static RSGroupProtos.RSGroupInfo toProtoGroupInfo(RSGroupInfo pojo) {
List<HBaseProtos.TableName> tables = new ArrayList<>(pojo.getTables().size());
for(TableName arg: pojo.getTables()) {
tables.add(ProtobufUtil.toProtoTableName(arg));
}
List<HBaseProtos.ServerName> hostports = new ArrayList<>(pojo.getServers().size());
for(Address el: pojo.getServers()) {
hostports.add(HBaseProtos.ServerName.newBuilder()
.setHostName(el.getHostname())
.setPort(el.getPort())
.build());
}
return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName())
.addAllServers(hostports)
.addAllTables(tables).build();
}
}

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.collect.Lists;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker
@InterfaceAudience.Private
public class RSGroupSerDe {
private static final Log LOG = LogFactory.getLog(RSGroupSerDe.class);
public RSGroupSerDe() {super();}
public List<RSGroupInfo> retrieveGroupList(Table groupTable) throws IOException {
List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
for (Result result : groupTable.getScanner(new Scan())) {
RSGroupProtos.RSGroupInfo proto =
RSGroupProtos.RSGroupInfo.parseFrom(
result.getValue(
RSGroupInfoManager.META_FAMILY_BYTES,
RSGroupInfoManager.META_QUALIFIER_BYTES));
RSGroupInfoList.add(toGroupInfo(proto));
}
return RSGroupInfoList;
}
public List<RSGroupInfo> retrieveGroupList(ZooKeeperWatcher watcher,
String groupBasePath) throws IOException {
List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
//Overwrite any info stored by table, this takes precedence
try {
if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) {
byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode));
if(data.length > 0) {
ProtobufUtil.expectPBMagicPrefix(data);
ByteArrayInputStream bis = new ByteArrayInputStream(
data, ProtobufUtil.lengthOfPBMagic(), data.length);
RSGroupInfoList.add(toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
}
}
LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
}
} catch (KeeperException e) {
throw new IOException("Failed to read rsGroupZNode",e);
} catch (DeserializationException e) {
throw new IOException("Failed to read rsGroupZNode",e);
} catch (InterruptedException e) {
throw new IOException("Failed to read rsGroupZNode",e);
}
return RSGroupInfoList;
}
public static RSGroupInfo toGroupInfo(RSGroupProtos.RSGroupInfo proto) {
RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName());
for(HBaseProtos.ServerName el: proto.getServersList()) {
RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort()));
}
for(HBaseProtos.TableName pTableName: proto.getTablesList()) {
RSGroupInfo.addTable(ProtobufUtil.toTableName(pTableName));
}
return RSGroupInfo;
}
public static RSGroupProtos.RSGroupInfo toProtoGroupInfo(RSGroupInfo pojo) {
List<HBaseProtos.TableName> tables =
new ArrayList<HBaseProtos.TableName>(pojo.getTables().size());
for(TableName arg: pojo.getTables()) {
tables.add(ProtobufUtil.toProtoTableName(arg));
}
List<HBaseProtos.ServerName> hostports =
new ArrayList<HBaseProtos.ServerName>(pojo.getServers().size());
for(Address el: pojo.getServers()) {
hostports.add(HBaseProtos.ServerName.newBuilder()
.setHostName(el.getHostname())
.setPort(el.getPort())
.build());
}
return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName())
.addAllServers(hostports)
.addAllTables(tables).build();
}
}

View File

@ -26,4 +26,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
* marked with this Interface before it runs.
*/
@InterfaceAudience.Private
public interface RSGroupableBalancer extends LoadBalancer {}
public interface RSGroupableBalancer extends LoadBalancer {
/** Config for pluggable load balancers */
String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class";
}

View File

@ -93,8 +93,9 @@ public class TestRSGroupBasedLoadBalancer {
tableDescs = constructTableDesc();
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.regions.slop", "0");
conf.set("hbase.group.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
loadBalancer = new RSGroupBasedLoadBalancer(getMockedGroupInfoManager());
conf.set("hbase.rsgroup.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
loadBalancer = new RSGroupBasedLoadBalancer();
loadBalancer.setRsGroupInfoManager(getMockedGroupInfoManager());
loadBalancer.setMasterServices(getMockedMaster());
loadBalancer.setConf(conf);
loadBalancer.initialize();

View File

@ -160,9 +160,9 @@ public class TestRSGroups extends TestRSGroupsBase {
LOG.info("testNamespaceCreateAndAssign");
String nsName = tablePrefix+"_foo";
final TableName tableName = TableName.valueOf(nsName, tablePrefix + "_testCreateAndAssign");
RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1);
RSGroupInfo appInfo = addGroup("appInfo", 1);
admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "appInfo").build());
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "appInfo").build());
final HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
admin.createTable(desc);
@ -186,7 +186,7 @@ public class TestRSGroups extends TestRSGroupsBase {
LOG.info("testDefaultNamespaceCreateAndAssign");
final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign");
admin.modifyNamespace(NamespaceDescriptor.create("default")
.addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "default").build());
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "default").build());
final HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
admin.createTable(desc);
@ -206,7 +206,7 @@ public class TestRSGroups extends TestRSGroupsBase {
LOG.info("testNamespaceConstraint");
rsGroupAdmin.addRSGroup(groupName);
admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName)
.build());
//test removing a referenced group
try {
@ -218,7 +218,7 @@ public class TestRSGroups extends TestRSGroupsBase {
//changing with the same name is fine
admin.modifyNamespace(
NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName)
.build());
String anotherGroup = tablePrefix+"_anotherGroup";
rsGroupAdmin.addRSGroup(anotherGroup);
@ -227,7 +227,7 @@ public class TestRSGroups extends TestRSGroupsBase {
rsGroupAdmin.removeRSGroup(groupName);
try {
admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "foo")
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo")
.build());
fail("Expected a constraint exception");
} catch (IOException ex) {
@ -250,7 +250,7 @@ public class TestRSGroups extends TestRSGroupsBase {
final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");
LOG.info("testMisplacedRegions");
final RSGroupInfo RSGroupInfo = addGroup(rsGroupAdmin, "testMisplacedRegions", 1);
final RSGroupInfo RSGroupInfo = addGroup("testMisplacedRegions", 1);
TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);

View File

@ -53,13 +53,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.rules.TestName;
public abstract class TestRSGroupsBase {
protected static final Log LOG = LogFactory.getLog(TestRSGroupsBase.class);
@Rule
public TestName name = new TestName();
//shared
protected final static String groupPrefix = "Group";
@ -75,15 +80,20 @@ public abstract class TestRSGroupsBase {
public final static long WAIT_TIMEOUT = 60000*5;
public final static int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster
// Per test variables
TableName tableName;
@Before
public void setup() {
LOG.info(name.getMethodName());
tableName = TableName.valueOf(tablePrefix + "_" + name.getMethodName());
}
protected RSGroupInfo addGroup(RSGroupAdmin gAdmin, String groupName,
int serverCount) throws IOException, InterruptedException {
RSGroupInfo defaultInfo = gAdmin
.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP);
protected RSGroupInfo addGroup(String groupName, int serverCount)
throws IOException, InterruptedException {
RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP);
assertTrue(defaultInfo != null);
assertTrue(defaultInfo.getServers().size() >= serverCount);
gAdmin.addRSGroup(groupName);
rsGroupAdmin.addRSGroup(groupName);
Set<Address> set = new HashSet<Address>();
for(Address server: defaultInfo.getServers()) {
@ -92,17 +102,17 @@ public abstract class TestRSGroupsBase {
}
set.add(server);
}
gAdmin.moveServers(set, groupName);
RSGroupInfo result = gAdmin.getRSGroupInfo(groupName);
rsGroupAdmin.moveServers(set, groupName);
RSGroupInfo result = rsGroupAdmin.getRSGroupInfo(groupName);
assertTrue(result.getServers().size() >= serverCount);
return result;
}
static void removeGroup(RSGroupAdminClient groupAdmin, String groupName) throws IOException {
RSGroupInfo RSGroupInfo = groupAdmin.getRSGroupInfo(groupName);
groupAdmin.moveTables(RSGroupInfo.getTables(), RSGroupInfo.DEFAULT_GROUP);
groupAdmin.moveServers(RSGroupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP);
groupAdmin.removeRSGroup(groupName);
void removeGroup(String groupName) throws IOException {
RSGroupInfo RSGroupInfo = rsGroupAdmin.getRSGroupInfo(groupName);
rsGroupAdmin.moveTables(RSGroupInfo.getTables(), RSGroupInfo.DEFAULT_GROUP);
rsGroupAdmin.moveServers(RSGroupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP);
rsGroupAdmin.removeRSGroup(groupName);
}
protected void deleteTableIfNecessary() throws IOException {
@ -137,7 +147,7 @@ public abstract class TestRSGroupsBase {
= getTableServerRegionMap();
for(TableName tableName : tableServerRegionMap.keySet()) {
if(!map.containsKey(tableName)) {
map.put(tableName, new LinkedList<String>());
map.put(tableName, new LinkedList<>());
}
for(List<String> subset: tableServerRegionMap.get(tableName).values()) {
map.get(tableName).addAll(subset);
@ -154,10 +164,10 @@ public abstract class TestRSGroupsBase {
for(RegionLoad rl : status.getLoad(serverName).getRegionsLoad().values()) {
TableName tableName = HRegionInfo.getTable(rl.getName());
if(!map.containsKey(tableName)) {
map.put(tableName, new TreeMap<ServerName, List<String>>());
map.put(tableName, new TreeMap<>());
}
if(!map.get(tableName).containsKey(serverName)) {
map.get(tableName).put(serverName, new LinkedList<String>());
map.get(tableName).put(serverName, new LinkedList<>());
}
map.get(tableName).get(serverName).add(rl.getNameAsString());
}
@ -202,8 +212,6 @@ public abstract class TestRSGroupsBase {
@Test
public void testCreateMultiRegion() throws IOException {
LOG.info("testCreateMultiRegion");
TableName tableName = TableName.valueOf(tablePrefix + "_testCreateMultiRegion");
byte[] end = {1,3,5,7,9};
byte[] start = {0,2,4,6,8};
byte[][] f = {Bytes.toBytes("f")};
@ -212,9 +220,6 @@ public abstract class TestRSGroupsBase {
@Test
public void testCreateAndDrop() throws Exception {
LOG.info("testCreateAndDrop");
final TableName tableName = TableName.valueOf(tablePrefix + "_testCreateAndDrop");
TEST_UTIL.createTable(tableName, Bytes.toBytes("cf"));
//wait for created table to be assigned
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@ -230,11 +235,9 @@ public abstract class TestRSGroupsBase {
@Test
public void testSimpleRegionServerMove() throws IOException,
InterruptedException {
LOG.info("testSimpleRegionServerMove");
int initNumGroups = rsGroupAdmin.listRSGroups().size();
RSGroupInfo appInfo = addGroup(rsGroupAdmin, getGroupName("testSimpleRegionServerMove"), 1);
RSGroupInfo adminInfo = addGroup(rsGroupAdmin, getGroupName("testSimpleRegionServerMove"), 1);
RSGroupInfo appInfo = addGroup(getGroupName(name.getMethodName()), 1);
RSGroupInfo adminInfo = addGroup(getGroupName(name.getMethodName()), 1);
RSGroupInfo dInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP);
Assert.assertEquals(initNumGroups + 2, rsGroupAdmin.listRSGroups().size());
assertEquals(1, adminInfo.getServers().size());
@ -264,10 +267,8 @@ public abstract class TestRSGroupsBase {
@Test
public void testMoveServers() throws Exception {
LOG.info("testMoveServers");
//create groups and assign servers
addGroup(rsGroupAdmin, "bar", 3);
addGroup("bar", 3);
rsGroupAdmin.addRSGroup("foo");
RSGroupInfo barGroup = rsGroupAdmin.getRSGroupInfo("bar");
@ -319,12 +320,9 @@ public abstract class TestRSGroupsBase {
@Test
public void testTableMoveTruncateAndDrop() throws Exception {
LOG.info("testTableMove");
final TableName tableName = TableName.valueOf(tablePrefix + "_testTableMoveAndDrop");
final byte[] familyNameBytes = Bytes.toBytes("f");
String newGroupName = getGroupName("testTableMove");
final RSGroupInfo newGroup = addGroup(rsGroupAdmin, newGroupName, 2);
String newGroupName = getGroupName(name.getMethodName());
final RSGroupInfo newGroup = addGroup(newGroupName, 2);
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, 5);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@ -378,14 +376,14 @@ public abstract class TestRSGroupsBase {
@Test
public void testGroupBalance() throws Exception {
LOG.info("testGroupBalance");
String newGroupName = getGroupName("testGroupBalance");
final RSGroupInfo newGroup = addGroup(rsGroupAdmin, newGroupName, 3);
LOG.info(name.getMethodName());
String newGroupName = getGroupName(name.getMethodName());
final RSGroupInfo newGroup = addGroup(newGroupName, 3);
final TableName tableName = TableName.valueOf(tablePrefix+"_ns", "testGroupBalance");
final TableName tableName = TableName.valueOf(tablePrefix+"_ns", name.getMethodName());
admin.createNamespace(
NamespaceDescriptor.create(tableName.getNamespaceAsString())
.addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, newGroupName).build());
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, newGroupName).build());
final byte[] familyNameBytes = Bytes.toBytes("f");
final HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
@ -447,10 +445,7 @@ public abstract class TestRSGroupsBase {
@Test
public void testRegionMove() throws Exception {
LOG.info("testRegionMove");
final RSGroupInfo newGroup = addGroup(rsGroupAdmin, getGroupName("testRegionMove"), 1);
final TableName tableName = TableName.valueOf(tablePrefix + rand.nextInt());
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
final byte[] familyNameBytes = Bytes.toBytes("f");
// All the regions created below will be assigned to the default group.
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, 6);
@ -515,11 +510,8 @@ public abstract class TestRSGroupsBase {
@Test
public void testFailRemoveGroup() throws IOException, InterruptedException {
LOG.info("testFailRemoveGroup");
int initNumGroups = rsGroupAdmin.listRSGroups().size();
addGroup(rsGroupAdmin, "bar", 3);
TableName tableName = TableName.valueOf(tablePrefix+"_my_table");
addGroup("bar", 3);
TEST_UTIL.createTable(tableName, Bytes.toBytes("f"));
rsGroupAdmin.moveTables(Sets.newHashSet(tableName), "bar");
RSGroupInfo barGroup = rsGroupAdmin.getRSGroupInfo("bar");
@ -551,14 +543,12 @@ public abstract class TestRSGroupsBase {
@Test
public void testKillRS() throws Exception {
LOG.info("testKillRS");
RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1);
RSGroupInfo appInfo = addGroup("appInfo", 1);
final TableName tableName = TableName.valueOf(tablePrefix+"_ns", "_testKillRS");
final TableName tableName = TableName.valueOf(tablePrefix+"_ns", name.getMethodName());
admin.createNamespace(
NamespaceDescriptor.create(tableName.getNamespaceAsString())
.addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, appInfo.getName()).build());
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build());
final HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
admin.createTable(desc);
@ -645,13 +635,11 @@ public abstract class TestRSGroupsBase {
@Test
public void testMultiTableMove() throws Exception {
LOG.info("testMultiTableMove");
final TableName tableNameA = TableName.valueOf(tablePrefix + "_testMultiTableMoveA");
final TableName tableNameB = TableName.valueOf(tablePrefix + "_testMultiTableMoveB");
final TableName tableNameA = TableName.valueOf(tablePrefix + name.getMethodName() + "A");
final TableName tableNameB = TableName.valueOf(tablePrefix + name.getMethodName() + "B");
final byte[] familyNameBytes = Bytes.toBytes("f");
String newGroupName = getGroupName("testMultiTableMove");
final RSGroupInfo newGroup = addGroup(rsGroupAdmin, newGroupName, 1);
String newGroupName = getGroupName(name.getMethodName());
final RSGroupInfo newGroup = addGroup(newGroupName, 1);
TEST_UTIL.createTable(tableNameA, familyNameBytes);
TEST_UTIL.createTable(tableNameB, familyNameBytes);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseCluster;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -46,19 +47,21 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
//This tests that GroupBasedBalancer will use data in zk
//to do balancing during master startup
//This does not test retain assignment
// This tests that GroupBasedBalancer will use data in zk to do balancing during master startup.
// This does not test retain assignment.
// The tests brings up 3 RS, creates a new RS group 'my_group', moves 1 RS to 'my_group', assigns
// 'hbase:rsgroup' to 'my_group', and kill the only server in that group so that 'hbase:rsgroup'
// table isn't available. It then kills the active master and waits for backup master to come
// online. In new master, RSGroupInfoManagerImpl gets the data from zk and waits for the expected
// assignment with a timeout.
@Category(MediumTests.class)
public class TestRSGroupsOfflineMode {
private static final org.apache.commons.logging.Log LOG =
LogFactory.getLog(TestRSGroupsOfflineMode.class);
private static final Log LOG = LogFactory.getLog(TestRSGroupsOfflineMode.class);
private static HMaster master;
private static Admin hbaseAdmin;
private static HBaseTestingUtility TEST_UTIL;
private static HBaseCluster cluster;
private static RSGroupAdminEndpoint RSGroupAdminEndpoint;
public final static long WAIT_TIMEOUT = 60000*5;
private final static long WAIT_TIMEOUT = 60000 * 5;
@Rule
public TestName name = new TestName();
@ -88,8 +91,6 @@ public class TestRSGroupsOfflineMode {
master.getServerManager().getOnlineServersList().size() >= 3;
}
});
RSGroupAdminEndpoint =
master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0);
}
@AfterClass
@ -99,8 +100,7 @@ public class TestRSGroupsOfflineMode {
@Test
public void testOffline() throws Exception, InterruptedException {
//table should be after group table name
//so it gets assigned later
// Table should be after group table name so it gets assigned later.
final TableName failoverTable = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(failoverTable, Bytes.toBytes("f"));
final HRegionServer killRS = ((MiniHBaseCluster)cluster).getRegionServer(0);
@ -111,20 +111,20 @@ public class TestRSGroupsOfflineMode {
groupAdmin.addRSGroup(newGroup);
if(master.getAssignmentManager().getRegionStates().getRegionAssignments()
.containsValue(failoverRS.getServerName())) {
for(HRegionInfo regionInfo: hbaseAdmin.getOnlineRegions(failoverRS.getServerName())) {
for (HRegionInfo regionInfo : hbaseAdmin.getOnlineRegions(failoverRS.getServerName())) {
hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
Bytes.toBytes(failoverRS.getServerName().getServerName()));
}
LOG.info("Waiting for region unassignments on failover RS...");
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
@Override public boolean evaluate() throws Exception {
return master.getServerManager().getLoad(failoverRS.getServerName())
.getRegionsLoad().size() > 0;
}
});
}
//move server to group and make sure all tables are assigned
// Move server to group and make sure all tables are assigned.
groupAdmin.moveServers(Sets.newHashSet(groupRS.getServerName().getAddress()), newGroup);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
@ -133,7 +133,7 @@ public class TestRSGroupsOfflineMode {
master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() < 1;
}
});
//move table to group and wait
// Move table to group and wait.
groupAdmin.moveTables(Sets.newHashSet(RSGroupInfoManager.RSGROUP_TABLE_NAME), newGroup);
LOG.info("Waiting for move table...");
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@ -142,10 +142,9 @@ public class TestRSGroupsOfflineMode {
return groupRS.getNumberOfOnlineRegions() == 1;
}
});
}
groupRS.stop("die");
//race condition here
// Race condition here.
TEST_UTIL.getHBaseCluster().getMaster().stopMaster();
LOG.info("Waiting for offline mode...");
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@ -159,16 +158,17 @@ public class TestRSGroupsOfflineMode {
}
});
RSGroupInfoManager groupMgr = RSGroupAdminEndpoint.getGroupInfoManager();
//make sure balancer is in offline mode, since this is what we're testing
// Get groupInfoManager from the new active master.
RSGroupInfoManager groupMgr = ((MiniHBaseCluster)cluster).getMaster().getMasterCoprocessorHost()
.findCoprocessors(RSGroupAdminEndpoint.class).get(0).getGroupInfoManager();
// Make sure balancer is in offline mode, since this is what we're testing.
assertFalse(groupMgr.isOnline());
//verify the group affiliation that's loaded from ZK instead of tables
// Verify the group affiliation that's loaded from ZK instead of tables.
assertEquals(newGroup,
groupMgr.getRSGroupOfTable(RSGroupInfoManager.RSGROUP_TABLE_NAME));
assertEquals(RSGroupInfo.DEFAULT_GROUP, groupMgr.getRSGroupOfTable(failoverTable));
//kill final regionserver to see the failover happens for all tables
//except GROUP table since it's group does not have any online RS
// Kill final regionserver to see the failover happens for all tables except GROUP table since
// it's group does not have any online RS.
killRS.stop("die");
master = TEST_UTIL.getHBaseCluster().getMaster();
LOG.info("Waiting for new table assignment...");
@ -180,7 +180,7 @@ public class TestRSGroupsOfflineMode {
});
Assert.assertEquals(0, failoverRS.getOnlineRegions(RSGroupInfoManager.RSGROUP_TABLE_NAME).size());
//need this for minicluster to shutdown cleanly
// Need this for minicluster to shutdown cleanly.
master.stopMaster();
}
}

View File

@ -89,8 +89,8 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
}
@Override
public boolean balanceRSGroup(String name) throws IOException {
return wrapped.balanceRSGroup(name);
public boolean balanceRSGroup(String groupName) throws IOException {
return wrapped.balanceRSGroup(groupName);
}
@Override
@ -113,7 +113,7 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
result.getValue(
RSGroupInfoManager.META_FAMILY_BYTES,
RSGroupInfoManager.META_QUALIFIER_BYTES));
groupMap.put(proto.getName(), RSGroupSerDe.toGroupInfo(proto));
groupMap.put(proto.getName(), RSGroupProtobufUtil.toGroupInfo(proto));
}
Assert.assertEquals(Sets.newHashSet(groupMap.values()),
Sets.newHashSet(wrapped.listRSGroups()));
@ -125,7 +125,7 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
ProtobufUtil.expectPBMagicPrefix(data);
ByteArrayInputStream bis = new ByteArrayInputStream(
data, ProtobufUtil.lengthOfPBMagic(), data.length);
zList.add(RSGroupSerDe.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
zList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
}
}
Assert.assertEquals(zList.size(), groupMap.size());