HBASE-10367 RegionServer graceful stop / decommissioning

Signed-off-by: Jerry He <jerryjch@apache.org>
This commit is contained in:
Jerry He 2017-10-19 21:44:38 -07:00
parent af479c580c
commit a43a00e89c
23 changed files with 555 additions and 386 deletions

View File

@ -17,6 +17,8 @@
#
# Add or remove servers from draining mode via zookeeper
# Deprecated in 2.0, and will be removed in 3.0. Use Admin decommission
# API instead.
require 'optparse'
include Java

View File

@ -2425,22 +2425,30 @@ public interface Admin extends Abortable, Closeable {
}
/**
* Mark a region server as draining to prevent additional regions from getting assigned to it.
* @param servers List of region servers to drain.
* Mark region server(s) as decommissioned to prevent additional regions from getting
* assigned to them. Optionally unload the regions on the servers. If there are multiple servers
* to be decommissioned, decommissioning them at the same time can prevent wasteful region
* movements. Region unloading is asynchronous.
* @param servers The list of servers to decommission.
* @param offload True to offload the regions from the decommissioned servers
*/
void drainRegionServers(List<ServerName> servers) throws IOException;
void decommissionRegionServers(List<ServerName> servers, boolean offload) throws IOException;
/**
* List region servers marked as draining to not get additional regions assigned to them.
* @return List of draining region servers.
* List region servers marked as decommissioned, which can not be assigned regions.
* @return List of decommissioned region servers.
*/
List<ServerName> listDrainingRegionServers() throws IOException;
List<ServerName> listDecommissionedRegionServers() throws IOException;
/**
* Remove drain from a region server to allow additional regions assignments.
* @param servers List of region servers to remove drain from.
* Remove decommission marker from a region server to allow regions assignments.
* Load regions onto the server if a list of regions is given. Region loading is
* asynchronous.
* @param server The server to recommission.
* @param encodedRegionNames Regions to load onto the server.
*/
void removeDrainFromRegionServers(List<ServerName> servers) throws IOException;
void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException;
/**
* Find all table and column families that are replicated from this cluster

View File

@ -759,22 +759,29 @@ public interface AsyncAdmin {
CompletableFuture<String> getLocks();
/**
* Mark a region server as draining to prevent additional regions from getting assigned to it.
* @param servers
* Mark region server(s) as decommissioned to prevent additional regions from getting
* assigned to them. Optionally unload the regions on the servers. If there are multiple servers
* to be decommissioned, decommissioning them at the same time can prevent wasteful region
* movements. Region unloading is asynchronous.
* @param servers The list of servers to decommission.
* @param offload True to offload the regions from the decommissioned servers
*/
CompletableFuture<Void> drainRegionServers(List<ServerName> servers);
CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload);
/**
* List region servers marked as draining to not get additional regions assigned to them.
* @return List of draining region servers wrapped by {@link CompletableFuture}
* List region servers marked as decommissioned, which can not be assigned regions.
* @return List of decommissioned region servers wrapped by {@link CompletableFuture}
*/
CompletableFuture<List<ServerName>> listDrainingRegionServers();
CompletableFuture<List<ServerName>> listDecommissionedRegionServers();
/**
* Remove drain from a region server to allow additional regions assignments.
* @param servers List of region servers to remove drain from.
* Remove decommission marker from a region server to allow regions assignments. Load regions onto
* the server if a list of regions is given. Region loading is asynchronous.
* @param server The server to recommission.
* @param encodedRegionNames Regions to load onto the server.
*/
CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers);
CompletableFuture<Void> recommissionRegionServer(ServerName server,
List<byte[]> encodedRegionNames);
/**
* @return cluster status wrapped by {@link CompletableFuture}

View File

@ -446,18 +446,20 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) {
return wrap(rawAdmin.drainRegionServers(servers));
public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
boolean offload) {
return wrap(rawAdmin.decommissionRegionServers(servers, offload));
}
@Override
public CompletableFuture<List<ServerName>> listDrainingRegionServers() {
return wrap(rawAdmin.listDrainingRegionServers());
public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
return wrap(rawAdmin.listDecommissionedRegionServers());
}
@Override
public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) {
return wrap(rawAdmin.removeDrainFromRegionServers(servers));
public CompletableFuture<Void> recommissionRegionServer(ServerName server,
List<byte[]> encodedRegionNames) {
return wrap(rawAdmin.recommissionRegionServer(server, encodedRegionNames));
}
@Override

View File

@ -89,18 +89,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
@ -1727,22 +1727,22 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller,
ListDrainingRegionServersRequest request) throws ServiceException {
return stub.listDrainingRegionServers(controller, request);
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
ListDecommissionedRegionServersRequest request) throws ServiceException {
return stub.listDecommissionedRegionServers(controller, request);
}
@Override
public DrainRegionServersResponse drainRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException {
return stub.drainRegionServers(controller, request);
public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
DecommissionRegionServersRequest request) throws ServiceException {
return stub.decommissionRegionServers(controller, request);
}
@Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(
RpcController controller, RemoveDrainFromRegionServersRequest request)
public RecommissionRegionServerResponse recommissionRegionServer(
RpcController controller, RecommissionRegionServerRequest request)
throws ServiceException {
return stub.removeDrainFromRegionServers(controller, request);
return stub.recommissionRegionServer(controller, request);
}
@Override

View File

@ -171,7 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
@ -4030,27 +4030,28 @@ public class HBaseAdmin implements Admin {
}
@Override
public void drainRegionServers(List<ServerName> servers) throws IOException {
public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
public Void rpcCall() throws ServiceException {
master.drainRegionServers(getRpcController(),
RequestConverter.buildDrainRegionServersRequest(servers));
master.decommissionRegionServers(getRpcController(),
RequestConverter.buildDecommissionRegionServersRequest(servers, offload));
return null;
}
});
}
@Override
public List<ServerName> listDrainingRegionServers() throws IOException {
public List<ServerName> listDecommissionedRegionServers() throws IOException {
return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
getRpcControllerFactory()) {
@Override
public List<ServerName> rpcCall() throws ServiceException {
ListDrainingRegionServersRequest req = ListDrainingRegionServersRequest.newBuilder().build();
ListDecommissionedRegionServersRequest req = ListDecommissionedRegionServersRequest.newBuilder().build();
List<ServerName> servers = new ArrayList<>();
for (HBaseProtos.ServerName server : master.listDrainingRegionServers(null, req)
.getServerNameList()) {
for (HBaseProtos.ServerName server : master
.listDecommissionedRegionServers(getRpcController(), req).getServerNameList()) {
servers.add(ProtobufUtil.toServerName(server));
}
return servers;
@ -4059,11 +4060,13 @@ public class HBaseAdmin implements Admin {
}
@Override
public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException {
public void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
public Void rpcCall() throws ServiceException {
master.removeDrainFromRegionServers(getRpcController(), RequestConverter.buildRemoveDrainFromRegionServersRequest(servers));
master.recommissionRegionServer(getRpcController(),
RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames));
return null;
}
});

View File

@ -126,6 +126,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
@ -136,8 +138,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
@ -180,8 +180,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrM
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
@ -200,8 +200,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
@ -1935,41 +1935,37 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<DrainRegionServersRequest, DrainRegionServersResponse, Void> call(controller, stub,
RequestConverter.buildDrainRegionServersRequest(servers),
(s, c, req, done) -> s.drainRegionServers(c, req, done), resp -> null)).call();
public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this
.<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
(s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
.call();
}
@Override
public CompletableFuture<List<ServerName>> listDrainingRegionServers() {
return this
.<List<ServerName>> newMasterCaller()
.action(
(controller, stub) -> this
.<ListDrainingRegionServersRequest, ListDrainingRegionServersResponse, List<ServerName>> call(
controller,
stub,
ListDrainingRegionServersRequest.newBuilder().build(),
(s, c, req, done) -> s.listDrainingRegionServers(c, req, done),
resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toList()))).call();
public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
return this.<List<ServerName>> newMasterCaller()
.action((controller, stub) -> this
.<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
List<ServerName>> call(
controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
(s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toList())))
.call();
}
@Override
public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<RemoveDrainFromRegionServersRequest, RemoveDrainFromRegionServersResponse, Void> call(
controller, stub, RequestConverter
.buildRemoveDrainFromRegionServersRequest(servers), (s, c, req, done) -> s
.removeDrainFromRegionServers(c, req, done), resp -> null)).call();
public CompletableFuture<Void> recommissionRegionServer(ServerName server,
List<byte[]> encodedRegionNames) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this
.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
(s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
.call();
}
/**

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
@ -46,8 +48,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
@ -94,8 +94,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrM
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
@ -120,8 +120,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
@ -257,9 +257,9 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
}
@Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(RpcController controller,
RemoveDrainFromRegionServersRequest request) throws ServiceException {
return stub.removeDrainFromRegionServers(controller, request);
public RecommissionRegionServerResponse recommissionRegionServer(RpcController controller,
RecommissionRegionServerRequest request) throws ServiceException {
return stub.recommissionRegionServer(controller, request);
}
@Override
@ -336,9 +336,9 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
}
@Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller,
ListDrainingRegionServersRequest request) throws ServiceException {
return stub.listDrainingRegionServers(controller, request);
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
ListDecommissionedRegionServersRequest request) throws ServiceException {
return stub.listDecommissionedRegionServers(controller, request);
}
@Override
@ -493,9 +493,9 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
}
@Override
public DrainRegionServersResponse drainRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException {
return stub.drainRegionServers(controller, request);
public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
DecommissionRegionServersRequest request) throws ServiceException {
return stub.decommissionRegionServers(controller, request);
}
@Override

View File

@ -96,11 +96,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
@ -122,7 +122,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
@ -1851,15 +1851,21 @@ public final class RequestConverter {
return GET_QUOTA_STATES_REQUEST;
}
public static DrainRegionServersRequest buildDrainRegionServersRequest(List<ServerName> servers) {
return DrainRegionServersRequest.newBuilder().addAllServerName(toProtoServerNames(servers))
.build();
public static DecommissionRegionServersRequest
buildDecommissionRegionServersRequest(List<ServerName> servers, boolean offload) {
return DecommissionRegionServersRequest.newBuilder()
.addAllServerName(toProtoServerNames(servers)).setOffload(offload).build();
}
public static RemoveDrainFromRegionServersRequest buildRemoveDrainFromRegionServersRequest(
List<ServerName> servers) {
return RemoveDrainFromRegionServersRequest.newBuilder()
.addAllServerName(toProtoServerNames(servers)).build();
public static RecommissionRegionServerRequest
buildRecommissionRegionServerRequest(ServerName server, List<byte[]> encodedRegionNames) {
RecommissionRegionServerRequest.Builder builder = RecommissionRegionServerRequest.newBuilder();
if (encodedRegionNames != null) {
for (byte[] name : encodedRegionNames) {
builder.addRegion(buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME, name));
}
}
return builder.setServerName(ProtobufUtil.toServerName(server)).build();
}
private static List<HBaseProtos.ServerName> toProtoServerNames(List<ServerName> servers) {

View File

@ -602,25 +602,27 @@ message SecurityCapabilitiesResponse {
repeated Capability capabilities = 1;
}
message ListDrainingRegionServersRequest {
message ListDecommissionedRegionServersRequest {
}
message ListDrainingRegionServersResponse {
repeated ServerName server_name = 1;
message ListDecommissionedRegionServersResponse {
repeated ServerName server_name = 1;
}
message DrainRegionServersRequest {
repeated ServerName server_name = 1;
message DecommissionRegionServersRequest {
repeated ServerName server_name = 1;
required bool offload = 2;
}
message DrainRegionServersResponse {
message DecommissionRegionServersResponse {
}
message RemoveDrainFromRegionServersRequest {
repeated ServerName server_name = 1;
message RecommissionRegionServerRequest {
required ServerName server_name = 1;
repeated RegionSpecifier region = 2;
}
message RemoveDrainFromRegionServersResponse {
message RecommissionRegionServerResponse {
}
message ListDeadServersRequest {
@ -967,17 +969,17 @@ service MasterService {
rpc ListReplicationPeers(ListReplicationPeersRequest)
returns(ListReplicationPeersResponse);
/** Returns a list of ServerNames marked as draining. */
rpc listDrainingRegionServers(ListDrainingRegionServersRequest)
returns(ListDrainingRegionServersResponse);
/** Returns a list of ServerNames marked as decommissioned. */
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDecommissionedRegionServersResponse);
/** Mark a list of ServerNames as draining. */
rpc drainRegionServers(DrainRegionServersRequest)
returns(DrainRegionServersResponse);
/** Decommission region servers. */
rpc DecommissionRegionServers(DecommissionRegionServersRequest)
returns(DecommissionRegionServersResponse);
/** Unmark a list of ServerNames marked as draining. */
rpc removeDrainFromRegionServers(RemoveDrainFromRegionServersRequest)
returns(RemoveDrainFromRegionServersResponse);
/** Re-commission region server. */
rpc RecommissionRegionServer(RecommissionRegionServerRequest)
returns(RecommissionRegionServerResponse);
/** Fetches the Master's view of space utilization */
rpc GetSpaceQuotaRegionSizes(GetSpaceQuotaRegionSizesRequest)

View File

@ -1461,4 +1461,40 @@ public interface MasterObserver {
*/
default void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {}
/**
* Called before decommission region servers.
*/
default void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, boolean offload) throws IOException {}
/**
* Called after decommission region servers.
*/
default void postDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, boolean offload) throws IOException {}
/**
* Called before list decommissioned region servers.
*/
default void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {}
/**
* Called after list decommissioned region servers.
*/
default void postListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {}
/**
* Called before recommission region server.
*/
default void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
ServerName server, List<byte[]> encodedRegionNames) throws IOException {}
/**
* Called after recommission region server.
*/
default void postRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
ServerName server, List<byte[]> encodedRegionNames) throws IOException {}
}

View File

@ -98,6 +98,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@ -3435,53 +3436,97 @@ public class HMaster extends HRegionServer implements MasterServices {
return peers;
}
@Override
public void drainRegionServer(final ServerName server) {
/**
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
* @param servers Region servers to decommission.
* @throws HBaseIOException
*/
public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
throws HBaseIOException {
List<ServerName> serversAdded = new ArrayList<>(servers.size());
// Place the decommission marker first.
String parentZnode = getZooKeeper().znodePaths.drainingZNode;
try {
String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
ZKUtil.createAndFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) {
LOG.warn(this.zooKeeper.prefix("Unable to add drain for '" + server.getServerName() + "'."),
ke);
}
}
@Override
public List<ServerName> listDrainingRegionServers() {
String parentZnode = getZooKeeper().znodePaths.drainingZNode;
List<ServerName> serverNames = new ArrayList<>();
List<String> serverStrs = null;
try {
serverStrs = ZKUtil.listChildrenNoWatch(getZooKeeper(), parentZnode);
} catch (KeeperException ke) {
LOG.warn(this.zooKeeper.prefix("Unable to list draining servers."), ke);
}
// No nodes is empty draining list or ZK connectivity issues.
if (serverStrs == null) {
return serverNames;
}
// Skip invalid ServerNames in result
for (String serverStr : serverStrs) {
for (ServerName server : servers) {
try {
serverNames.add(ServerName.parseServerName(serverStr));
} catch (IllegalArgumentException iae) {
LOG.warn("Unable to cast '" + serverStr + "' to ServerName.", iae);
String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
ZKUtil.createAndFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) {
throw new HBaseIOException(
this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
}
if (this.serverManager.addServerToDrainList(server)) {
serversAdded.add(server);
};
}
// Move the regions off the decommissioned servers.
if (offload) {
final List<ServerName> destServers = this.serverManager.createDestinationServersList();
for (ServerName server : serversAdded) {
final List<RegionInfo> regionsOnServer =
this.assignmentManager.getRegionStates().getServerRegionInfoSet(server);
for (RegionInfo hri : regionsOnServer) {
ServerName dest = balancer.randomAssignment(hri, destServers);
if (dest == null) {
throw new HBaseIOException("Unable to determine a plan to move " + hri);
}
RegionPlan rp = new RegionPlan(hri, server, dest);
this.assignmentManager.moveAsync(rp);
}
}
}
return serverNames;
}
@Override
public void removeDrainFromRegionServer(ServerName server) {
/**
* List region servers marked as decommissioned (previously called 'draining') to not get regions
* assigned to them.
* @return List of decommissioned servers.
*/
public List<ServerName> listDecommissionedRegionServers() {
return this.serverManager.getDrainingServersList();
}
/**
* Remove decommission marker (previously called 'draining') from a region server to allow regions
* assignments. Load regions onto the server asynchronously if a list of regions is given
* @param server Region server to remove decommission marker from.
* @throws HBaseIOException
*/
public void recommissionRegionServer(final ServerName server,
final List<byte[]> encodedRegionNames) throws HBaseIOException {
// Remove the server from decommissioned (draining) server list.
String parentZnode = getZooKeeper().znodePaths.drainingZNode;
String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
try {
ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) {
LOG.warn(
this.zooKeeper.prefix("Unable to remove drain for '" + server.getServerName() + "'."), ke);
throw new HBaseIOException(
this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
}
this.serverManager.removeServerFromDrainList(server);
// Load the regions onto the server if we are given a list of regions.
if (encodedRegionNames == null || encodedRegionNames.isEmpty()) {
return;
}
if (!this.serverManager.isServerOnline(server)) {
return;
}
for (byte[] encodedRegionName : encodedRegionNames) {
RegionState regionState =
assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
if (regionState == null) {
LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
continue;
}
RegionInfo hri = regionState.getRegion();
if (server.equals(regionState.getServerName())) {
LOG.info("Skipping move of region " + hri.getRegionNameAsString()
+ " because region already assigned to the same server " + server + ".");
continue;
}
RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
this.assignmentManager.moveAsync(rp);
}
}

View File

@ -1684,4 +1684,60 @@ public class MasterCoprocessorHost
}
});
}
public void preDecommissionRegionServers(List<ServerName> servers, boolean offload) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preDecommissionRegionServers(this, servers, offload);
}
});
}
public void postDecommissionRegionServers(List<ServerName> servers, boolean offload) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postDecommissionRegionServers(this, servers, offload);
}
});
}
public void preListDecommissionedRegionServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preListDecommissionedRegionServers(this);
}
});
}
public void postListDecommissionedRegionServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postListDecommissionedRegionServers(this);
}
});
}
public void preRecommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preRecommissionRegionServer(this, server, encodedRegionNames);
}
});
}
public void postRecommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postRecommissionRegionServer(this, server, encodedRegionNames);
}
});
}
}

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -119,6 +120,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
@ -129,8 +132,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
@ -177,8 +178,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrM
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
@ -203,8 +204,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
@ -1902,15 +1903,21 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller,
ListDrainingRegionServersRequest request) throws ServiceException {
ListDrainingRegionServersResponse.Builder response =
ListDrainingRegionServersResponse.newBuilder();
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
RpcController controller, ListDecommissionedRegionServersRequest request)
throws ServiceException {
ListDecommissionedRegionServersResponse.Builder response =
ListDecommissionedRegionServersResponse.newBuilder();
try {
master.checkInitialized();
List<ServerName> servers = master.listDrainingRegionServers();
for (ServerName server : servers) {
response.addServerName(ProtobufUtil.toServerName(server));
if (master.cpHost != null) {
master.cpHost.preListDecommissionedRegionServers();
}
List<ServerName> servers = master.listDecommissionedRegionServers();
response.addAllServerName((servers.stream().map(server -> ProtobufUtil.toServerName(server)))
.collect(Collectors.toList()));
if (master.cpHost != null) {
master.cpHost.postListDecommissionedRegionServers();
}
} catch (IOException io) {
throw new ServiceException(io);
@ -1920,36 +1927,48 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
public DrainRegionServersResponse drainRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException {
DrainRegionServersResponse.Builder response = DrainRegionServersResponse.newBuilder();
public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
DecommissionRegionServersRequest request) throws ServiceException {
try {
master.checkInitialized();
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
master.drainRegionServer(ProtobufUtil.toServerName(pbServer));
List<ServerName> servers = request.getServerNameList().stream()
.map(pbServer -> ProtobufUtil.toServerName(pbServer)).collect(Collectors.toList());
boolean offload = request.getOffload();
if (master.cpHost != null) {
master.cpHost.preDecommissionRegionServers(servers, offload);
}
master.decommissionRegionServers(servers, offload);
if (master.cpHost != null) {
master.cpHost.postDecommissionRegionServers(servers, offload);
}
} catch (IOException io) {
throw new ServiceException(io);
}
return response.build();
return DecommissionRegionServersResponse.newBuilder().build();
}
@Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(RpcController controller,
RemoveDrainFromRegionServersRequest request) throws ServiceException {
RemoveDrainFromRegionServersResponse.Builder response =
RemoveDrainFromRegionServersResponse.newBuilder();
public RecommissionRegionServerResponse recommissionRegionServer(RpcController controller,
RecommissionRegionServerRequest request) throws ServiceException {
try {
master.checkInitialized();
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
master.removeDrainFromRegionServer(ProtobufUtil.toServerName(pbServer));
ServerName server = ProtobufUtil.toServerName(request.getServerName());
List<byte[]> encodedRegionNames = request.getRegionList().stream()
.map(regionSpecifier -> regionSpecifier.getValue().toByteArray())
.collect(Collectors.toList());
if (master.cpHost != null) {
master.cpHost.preRecommissionRegionServer(server, encodedRegionNames);
}
master.recommissionRegionServer(server, encodedRegionNames);
if (master.cpHost != null) {
master.cpHost.postRecommissionRegionServer(server, encodedRegionNames);
}
} catch (IOException io) {
throw new ServiceException(io);
}
return response.build();
return RecommissionRegionServerResponse.newBuilder().build();
}
@Override

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@ -475,24 +476,6 @@ public interface MasterServices extends Server {
List<ReplicationPeerDescription> listReplicationPeers(String regex) throws ReplicationException,
IOException;
/**
* Mark a region server as draining to prevent additional regions from getting assigned to it.
* @param server Region servers to drain.
*/
void drainRegionServer(final ServerName server);
/**
* List region servers marked as draining to not get additional regions assigned to them.
* @return List of draining servers.
*/
List<ServerName> listDrainingRegionServers();
/**
* Remove drain from a region server to allow additional regions assignments.
* @param server Region server to remove drain from.
*/
void removeDrainFromRegionServer(final ServerName server);
/**
* @return {@link LockManager} to lock namespaces/tables/regions.
*/

View File

@ -42,6 +42,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLoad;
@ -92,6 +93,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* A server is fully processed only after the handler is fully enabled
* and has completed the handling.
*/
/**
*
*/
@InterfaceAudience.Private
public class ServerManager {
public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
@ -664,7 +668,7 @@ public class ServerManager {
/*
* Remove the server from the drain list.
*/
public boolean removeServerFromDrainList(final ServerName sn) {
public synchronized boolean removeServerFromDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>
@ -676,10 +680,12 @@ public class ServerManager {
return this.drainingServers.remove(sn);
}
/*
/**
* Add the server to the drain list.
* @param sn
* @return True if the server is added or the server is already on the drain list.
*/
public boolean addServerToDrainList(final ServerName sn) {
public synchronized boolean addServerToDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>
@ -693,7 +699,7 @@ public class ServerManager {
if (this.drainingServers.contains(sn)) {
LOG.warn("Server " + sn + " is already in the draining server list." +
"Ignoring request to add it again.");
return false;
return true;
}
LOG.info("Server " + sn + " added to draining server list.");
return this.drainingServers.add(sn);

View File

@ -1504,6 +1504,23 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
requirePermission(getActiveUser(ctx), "clearDeadServers", Action.ADMIN);
}
@Override
public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, boolean offload) throws IOException {
requirePermission(getActiveUser(ctx), "decommissionRegionServers", Action.ADMIN);
}
@Override
public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
requirePermission(getActiveUser(ctx), "listDecommissionedRegionServers", Action.ADMIN);
}
@Override
public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
ServerName server, List<byte[]> encodedRegionNames) throws IOException {
requirePermission(getActiveUser(ctx), "recommissionRegionServers", Action.ADMIN);
}
/* ---- RegionObserver implementation ---- */
@Override

View File

@ -44,6 +44,9 @@ import org.apache.zookeeper.KeeperException;
* <p>If an RS gets added to the draining list, we add a watcher to it and call
* {@link ServerManager#addServerToDrainList(ServerName)}
*
* <p>This class is deprecated in 2.0 because decommission/draining API goes through
* master in 2.0. Can remove this class in 3.0.
*
*/
@InterfaceAudience.Private
public class DrainingServerTracker extends ZooKeeperListener {

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
@ -34,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -695,65 +697,62 @@ public class TestAdmin2 {
assertTrue(lockList.startsWith("["));
}
/*
* This test drains all regions so cannot be run in parallel with other tests.
*/
@Ignore @Test(timeout = 30000)
public void testDrainRegionServers() throws Exception {
List<ServerName> drainingServers = admin.listDrainingRegionServers();
assertTrue(drainingServers.isEmpty());
@Test(timeout = 30000)
public void testDecommissionRegionServers() throws Exception {
List<ServerName> decommissionedRegionServers = admin.listDecommissionedRegionServers();
assertTrue(decommissionedRegionServers.isEmpty());
// Drain all region servers.
Collection<ServerName> clusterServers =
admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers();
drainingServers = new ArrayList<>();
for (ServerName server : clusterServers) {
drainingServers.add(server);
}
admin.drainRegionServers(drainingServers);
final TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createMultiRegionTable(tableName, "f".getBytes(), 6);
// Check that drain lists all region servers.
drainingServers = admin.listDrainingRegionServers();
assertEquals(clusterServers.size(), drainingServers.size());
for (ServerName server : clusterServers) {
assertTrue(drainingServers.contains(server));
}
ArrayList<ServerName> clusterRegionServers =
new ArrayList<>(admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers());
// Try for 20 seconds to create table (new region). Will not complete because all RSs draining.
final TableName hTable = TableName.valueOf(name.getMethodName());
final HTableDescriptor htd = new HTableDescriptor(hTable);
htd.addFamily(new HColumnDescriptor("cf"));
assertEquals(clusterRegionServers.size(), 3);
final Runnable createTable = new Thread() {
@Override
public void run() {
try {
admin.createTable(htd);
} catch (IOException ioe) {
assertTrue(false); // Should not get IOException.
}
HashMap<ServerName, List<RegionInfo>> serversToDecommssion = new HashMap<>();
// Get a server that has regions. We will decommission two of the servers,
// leaving one online.
int i;
for (i = 0; i < clusterRegionServers.size(); i++) {
List<RegionInfo> regionsOnServer = admin.getRegions(clusterRegionServers.get(i));
if (regionsOnServer.size() > 0) {
serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer);
break;
}
};
final ExecutorService executor = Executors.newSingleThreadExecutor();
final java.util.concurrent.Future<?> future = executor.submit(createTable);
executor.shutdown();
try {
future.get(20, TimeUnit.SECONDS);
} catch (TimeoutException ie) {
assertTrue(true); // Expecting timeout to happen.
}
// Kill executor if still processing.
if (!executor.isTerminated()) {
executor.shutdownNow();
assertTrue(true);
clusterRegionServers.remove(i);
// Get another server to decommission.
serversToDecommssion.put(clusterRegionServers.get(0),
admin.getRegions(clusterRegionServers.get(0)));
ServerName remainingServer = clusterRegionServers.get(1);
// Decommission
admin.decommissionRegionServers(new ArrayList<ServerName>(serversToDecommssion.keySet()), true);
assertEquals(2, admin.listDecommissionedRegionServers().size());
// Verify the regions have been off the decommissioned servers, all on the one
// remaining server.
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, remainingServer, 10000);
}
}
// Remove drain list.
admin.removeDrainFromRegionServers(drainingServers);
drainingServers = admin.listDrainingRegionServers();
assertTrue(drainingServers.isEmpty());
// Recommission and load the regions.
for (ServerName server : serversToDecommssion.keySet()) {
List<byte[]> encodedRegionNames = serversToDecommssion.get(server).stream()
.map(region -> region.getEncodedNameAsBytes()).collect(Collectors.toList());
admin.recommissionRegionServer(server, encodedRegionNames);
}
assertTrue(admin.listDecommissionedRegionServers().isEmpty());
// Verify the regions have been moved to the recommissioned servers
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, server, 10000);
}
}
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ClusterStatus.Option;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncDecommissionAdminApi extends TestAsyncAdminBase {
@Test(timeout = 30000)
public void testAsyncDecommissionRegionServers() throws Exception {
List<ServerName> decommissionedRegionServers = admin.listDecommissionedRegionServers().get();
assertTrue(decommissionedRegionServers.isEmpty());
TEST_UTIL.createMultiRegionTable(tableName, FAMILY, 4);
ArrayList<ServerName> clusterRegionServers =
new ArrayList<>(admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).get().getServers());
assertEquals(clusterRegionServers.size(), 2);
HashMap<ServerName, List<RegionInfo>> serversToDecommssion = new HashMap<>();
// Get a server that has regions. We will decommission one of the servers,
// leaving one online.
int i;
for (i = 0; i < clusterRegionServers.size(); i++) {
List<RegionInfo> regionsOnServer = admin.getOnlineRegions(clusterRegionServers.get(i)).get();
if (regionsOnServer.size() > 0) {
serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer);
break;
}
}
clusterRegionServers.remove(i);
ServerName remainingServer = clusterRegionServers.get(0);
// Decommission
admin.decommissionRegionServers(new ArrayList<ServerName>(serversToDecommssion.keySet()),
true).get();
assertEquals(1, admin.listDecommissionedRegionServers().get().size());
// Verify the regions have been off the decommissioned servers, all on the remaining server.
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, remainingServer, 10000);
}
}
// Recommission and load regions
for (ServerName server : serversToDecommssion.keySet()) {
List<byte[]> encodedRegionNames = serversToDecommssion.get(server).stream()
.map(region -> region.getEncodedNameAsBytes()).collect(Collectors.toList());
admin.recommissionRegionServer(server, encodedRegionNames).get();
}
assertTrue(admin.listDecommissionedRegionServers().get().isEmpty());
// Verify the regions have been moved to the recommissioned servers
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, server, 10000);
}
}
}
}

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncDrainAdminApi extends TestAsyncAdminBase {
/*
* This test drains all regions so cannot be run in parallel with other tests.
*/
@Ignore @Test(timeout = 30000)
public void testDrainRegionServers() throws Exception {
List<ServerName> drainingServers = admin.listDrainingRegionServers().get();
assertTrue(drainingServers.isEmpty());
// Drain all region servers.
Collection<ServerName> clusterServers = admin.getRegionServers().get();
drainingServers = new ArrayList<>();
for (ServerName server : clusterServers) {
drainingServers.add(server);
}
admin.drainRegionServers(drainingServers).join();
// Check that drain lists all region servers.
drainingServers = admin.listDrainingRegionServers().get();
assertEquals(clusterServers.size(), drainingServers.size());
for (ServerName server : clusterServers) {
assertTrue(drainingServers.contains(server));
}
// Try for 20 seconds to create table (new region). Will not complete because all RSs draining.
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
final Runnable createTable = new Thread() {
@Override
public void run() {
try {
admin.createTable(builder.build()).join();
} catch (Exception ioe) {
assertTrue(false); // Should not get IOException.
}
}
};
final ExecutorService executor = Executors.newSingleThreadExecutor();
final java.util.concurrent.Future<?> future = executor.submit(createTable);
executor.shutdown();
try {
future.get(20, TimeUnit.SECONDS);
} catch (TimeoutException ie) {
assertTrue(true); // Expecting timeout to happen.
}
// Kill executor if still processing.
if (!executor.isTerminated()) {
executor.shutdownNow();
assertTrue(true);
}
// Remove drain list.
admin.removeDrainFromRegionServers(drainingServers);
drainingServers = admin.listDrainingRegionServers().get();
assertTrue(drainingServers.isEmpty());
}
}

View File

@ -408,21 +408,6 @@ public class MockNoopMasterServices implements MasterServices, Server {
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
}
@Override
public void drainRegionServer(ServerName server) {
return;
}
@Override
public List<ServerName> listDrainingRegionServers() {
return null;
}
@Override
public void removeDrainFromRegionServer(ServerName servers) {
return;
}
@Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException,
IOException {

View File

@ -325,19 +325,19 @@ public class TestZooKeeperACL {
if (!secureZKAvailable) {
return;
}
List<ServerName> drainingServers = new ArrayList<>(1);
drainingServers.add(ServerName.parseServerName("ZZZ,123,123"));
List<ServerName> decommissionedServers = new ArrayList<>(1);
decommissionedServers.add(ServerName.parseServerName("ZZZ,123,123"));
// If unable to connect to secure ZK cluster then this operation would fail.
TEST_UTIL.getAdmin().drainRegionServers(drainingServers);
TEST_UTIL.getAdmin().decommissionRegionServers(decommissionedServers, false);
drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers();
assertEquals(1, drainingServers.size());
assertEquals(ServerName.parseServerName("ZZZ,123,123"), drainingServers.get(0));
decommissionedServers = TEST_UTIL.getAdmin().listDecommissionedRegionServers();
assertEquals(1, decommissionedServers.size());
assertEquals(ServerName.parseServerName("ZZZ,123,123"), decommissionedServers.get(0));
TEST_UTIL.getAdmin().removeDrainFromRegionServers(drainingServers);
drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers();
assertEquals(0, drainingServers.size());
TEST_UTIL.getAdmin().recommissionRegionServer(decommissionedServers.get(0), null);
decommissionedServers = TEST_UTIL.getAdmin().listDecommissionedRegionServers();
assertEquals(0, decommissionedServers.size());
}
}