HBASE-10367 RegionServer graceful stop / decommissioning

Signed-off-by: Jerry He <jerryjch@apache.org>
This commit is contained in:
Jerry He 2017-10-19 21:44:38 -07:00
parent 64f024a76c
commit 75d2bba739
23 changed files with 555 additions and 386 deletions

View File

@ -17,6 +17,8 @@
# #
# Add or remove servers from draining mode via zookeeper # Add or remove servers from draining mode via zookeeper
# Deprecated in 2.0, and will be removed in 3.0. Use Admin decommission
# API instead.
require 'optparse' require 'optparse'
include Java include Java

View File

@ -2425,22 +2425,30 @@ public interface Admin extends Abortable, Closeable {
} }
/** /**
* Mark a region server as draining to prevent additional regions from getting assigned to it. * Mark region server(s) as decommissioned to prevent additional regions from getting
* @param servers List of region servers to drain. * assigned to them. Optionally unload the regions on the servers. If there are multiple servers
* to be decommissioned, decommissioning them at the same time can prevent wasteful region
* movements. Region unloading is asynchronous.
* @param servers The list of servers to decommission.
* @param offload True to offload the regions from the decommissioned servers
*/ */
void drainRegionServers(List<ServerName> servers) throws IOException; void decommissionRegionServers(List<ServerName> servers, boolean offload) throws IOException;
/** /**
* List region servers marked as draining to not get additional regions assigned to them. * List region servers marked as decommissioned, which can not be assigned regions.
* @return List of draining region servers. * @return List of decommissioned region servers.
*/ */
List<ServerName> listDrainingRegionServers() throws IOException; List<ServerName> listDecommissionedRegionServers() throws IOException;
/** /**
* Remove drain from a region server to allow additional regions assignments. * Remove decommission marker from a region server to allow regions assignments.
* @param servers List of region servers to remove drain from. * Load regions onto the server if a list of regions is given. Region loading is
* asynchronous.
* @param server The server to recommission.
* @param encodedRegionNames Regions to load onto the server.
*/ */
void removeDrainFromRegionServers(List<ServerName> servers) throws IOException; void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException;
/** /**
* Find all table and column families that are replicated from this cluster * Find all table and column families that are replicated from this cluster

View File

@ -759,22 +759,29 @@ public interface AsyncAdmin {
CompletableFuture<String> getLocks(); CompletableFuture<String> getLocks();
/** /**
* Mark a region server as draining to prevent additional regions from getting assigned to it. * Mark region server(s) as decommissioned to prevent additional regions from getting
* @param servers * assigned to them. Optionally unload the regions on the servers. If there are multiple servers
* to be decommissioned, decommissioning them at the same time can prevent wasteful region
* movements. Region unloading is asynchronous.
* @param servers The list of servers to decommission.
* @param offload True to offload the regions from the decommissioned servers
*/ */
CompletableFuture<Void> drainRegionServers(List<ServerName> servers); CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload);
/** /**
* List region servers marked as draining to not get additional regions assigned to them. * List region servers marked as decommissioned, which can not be assigned regions.
* @return List of draining region servers wrapped by {@link CompletableFuture} * @return List of decommissioned region servers wrapped by {@link CompletableFuture}
*/ */
CompletableFuture<List<ServerName>> listDrainingRegionServers(); CompletableFuture<List<ServerName>> listDecommissionedRegionServers();
/** /**
* Remove drain from a region server to allow additional regions assignments. * Remove decommission marker from a region server to allow regions assignments. Load regions onto
* @param servers List of region servers to remove drain from. * the server if a list of regions is given. Region loading is asynchronous.
* @param server The server to recommission.
* @param encodedRegionNames Regions to load onto the server.
*/ */
CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers); CompletableFuture<Void> recommissionRegionServer(ServerName server,
List<byte[]> encodedRegionNames);
/** /**
* @return cluster status wrapped by {@link CompletableFuture} * @return cluster status wrapped by {@link CompletableFuture}

View File

@ -446,18 +446,20 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) { public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
return wrap(rawAdmin.drainRegionServers(servers)); boolean offload) {
return wrap(rawAdmin.decommissionRegionServers(servers, offload));
} }
@Override @Override
public CompletableFuture<List<ServerName>> listDrainingRegionServers() { public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
return wrap(rawAdmin.listDrainingRegionServers()); return wrap(rawAdmin.listDecommissionedRegionServers());
} }
@Override @Override
public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) { public CompletableFuture<Void> recommissionRegionServer(ServerName server,
return wrap(rawAdmin.removeDrainFromRegionServers(servers)); List<byte[]> encodedRegionNames) {
return wrap(rawAdmin.recommissionRegionServer(server, encodedRegionNames));
} }
@Override @Override

View File

@ -89,18 +89,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
@ -1727,22 +1727,22 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
@Override @Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller, public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
ListDrainingRegionServersRequest request) throws ServiceException { ListDecommissionedRegionServersRequest request) throws ServiceException {
return stub.listDrainingRegionServers(controller, request); return stub.listDecommissionedRegionServers(controller, request);
} }
@Override @Override
public DrainRegionServersResponse drainRegionServers(RpcController controller, public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException { DecommissionRegionServersRequest request) throws ServiceException {
return stub.drainRegionServers(controller, request); return stub.decommissionRegionServers(controller, request);
} }
@Override @Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers( public RecommissionRegionServerResponse recommissionRegionServer(
RpcController controller, RemoveDrainFromRegionServersRequest request) RpcController controller, RecommissionRegionServerRequest request)
throws ServiceException { throws ServiceException {
return stub.removeDrainFromRegionServers(controller, request); return stub.recommissionRegionServer(controller, request);
} }
@Override @Override

View File

@ -171,7 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
@ -4030,27 +4030,28 @@ public class HBaseAdmin implements Admin {
} }
@Override @Override
public void drainRegionServers(List<ServerName> servers) throws IOException { public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override @Override
public Void rpcCall() throws ServiceException { public Void rpcCall() throws ServiceException {
master.drainRegionServers(getRpcController(), master.decommissionRegionServers(getRpcController(),
RequestConverter.buildDrainRegionServersRequest(servers)); RequestConverter.buildDecommissionRegionServersRequest(servers, offload));
return null; return null;
} }
}); });
} }
@Override @Override
public List<ServerName> listDrainingRegionServers() throws IOException { public List<ServerName> listDecommissionedRegionServers() throws IOException {
return executeCallable(new MasterCallable<List<ServerName>>(getConnection(), return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
getRpcControllerFactory()) { getRpcControllerFactory()) {
@Override @Override
public List<ServerName> rpcCall() throws ServiceException { public List<ServerName> rpcCall() throws ServiceException {
ListDrainingRegionServersRequest req = ListDrainingRegionServersRequest.newBuilder().build(); ListDecommissionedRegionServersRequest req = ListDecommissionedRegionServersRequest.newBuilder().build();
List<ServerName> servers = new ArrayList<>(); List<ServerName> servers = new ArrayList<>();
for (HBaseProtos.ServerName server : master.listDrainingRegionServers(null, req) for (HBaseProtos.ServerName server : master
.getServerNameList()) { .listDecommissionedRegionServers(getRpcController(), req).getServerNameList()) {
servers.add(ProtobufUtil.toServerName(server)); servers.add(ProtobufUtil.toServerName(server));
} }
return servers; return servers;
@ -4059,11 +4060,13 @@ public class HBaseAdmin implements Admin {
} }
@Override @Override
public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException { public void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override @Override
public Void rpcCall() throws ServiceException { public Void rpcCall() throws ServiceException {
master.removeDrainFromRegionServers(getRpcController(), RequestConverter.buildRemoveDrainFromRegionServersRequest(servers)); master.recommissionRegionServer(getRpcController(),
RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames));
return null; return null;
} }
}); });

View File

@ -126,6 +126,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
@ -136,8 +138,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
@ -180,8 +180,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrM
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
@ -200,8 +200,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
@ -1935,41 +1935,37 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) { public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
return this return this.<Void> newMasterCaller()
.<Void> newMasterCaller() .action((controller, stub) -> this
.action( .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
(controller, stub) -> this controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
.<DrainRegionServersRequest, DrainRegionServersResponse, Void> call(controller, stub, (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
RequestConverter.buildDrainRegionServersRequest(servers), .call();
(s, c, req, done) -> s.drainRegionServers(c, req, done), resp -> null)).call();
} }
@Override @Override
public CompletableFuture<List<ServerName>> listDrainingRegionServers() { public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
return this return this.<List<ServerName>> newMasterCaller()
.<List<ServerName>> newMasterCaller() .action((controller, stub) -> this
.action( .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
(controller, stub) -> this List<ServerName>> call(
.<ListDrainingRegionServersRequest, ListDrainingRegionServersResponse, List<ServerName>> call( controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
controller, (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
stub,
ListDrainingRegionServersRequest.newBuilder().build(),
(s, c, req, done) -> s.listDrainingRegionServers(c, req, done),
resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toList()))).call(); .collect(Collectors.toList())))
.call();
} }
@Override @Override
public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) { public CompletableFuture<Void> recommissionRegionServer(ServerName server,
return this List<byte[]> encodedRegionNames) {
.<Void> newMasterCaller() return this.<Void> newMasterCaller()
.action( .action((controller, stub) -> this
(controller, stub) -> this .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
.<RemoveDrainFromRegionServersRequest, RemoveDrainFromRegionServersResponse, Void> call( stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
controller, stub, RequestConverter (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
.buildRemoveDrainFromRegionServersRequest(servers), (s, c, req, done) -> s .call();
.removeDrainFromRegionServers(c, req, done), resp -> null)).call();
} }
/** /**

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
@ -46,8 +48,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
@ -94,8 +94,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrM
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
@ -120,8 +120,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
@ -257,9 +257,9 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
} }
@Override @Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(RpcController controller, public RecommissionRegionServerResponse recommissionRegionServer(RpcController controller,
RemoveDrainFromRegionServersRequest request) throws ServiceException { RecommissionRegionServerRequest request) throws ServiceException {
return stub.removeDrainFromRegionServers(controller, request); return stub.recommissionRegionServer(controller, request);
} }
@Override @Override
@ -336,9 +336,9 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
} }
@Override @Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller, public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
ListDrainingRegionServersRequest request) throws ServiceException { ListDecommissionedRegionServersRequest request) throws ServiceException {
return stub.listDrainingRegionServers(controller, request); return stub.listDecommissionedRegionServers(controller, request);
} }
@Override @Override
@ -493,9 +493,9 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
} }
@Override @Override
public DrainRegionServersResponse drainRegionServers(RpcController controller, public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException { DecommissionRegionServersRequest request) throws ServiceException {
return stub.drainRegionServers(controller, request); return stub.decommissionRegionServers(controller, request);
} }
@Override @Override

View File

@ -96,11 +96,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
@ -122,7 +122,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
@ -1851,15 +1851,21 @@ public final class RequestConverter {
return GET_QUOTA_STATES_REQUEST; return GET_QUOTA_STATES_REQUEST;
} }
public static DrainRegionServersRequest buildDrainRegionServersRequest(List<ServerName> servers) { public static DecommissionRegionServersRequest
return DrainRegionServersRequest.newBuilder().addAllServerName(toProtoServerNames(servers)) buildDecommissionRegionServersRequest(List<ServerName> servers, boolean offload) {
.build(); return DecommissionRegionServersRequest.newBuilder()
.addAllServerName(toProtoServerNames(servers)).setOffload(offload).build();
} }
public static RemoveDrainFromRegionServersRequest buildRemoveDrainFromRegionServersRequest( public static RecommissionRegionServerRequest
List<ServerName> servers) { buildRecommissionRegionServerRequest(ServerName server, List<byte[]> encodedRegionNames) {
return RemoveDrainFromRegionServersRequest.newBuilder() RecommissionRegionServerRequest.Builder builder = RecommissionRegionServerRequest.newBuilder();
.addAllServerName(toProtoServerNames(servers)).build(); if (encodedRegionNames != null) {
for (byte[] name : encodedRegionNames) {
builder.addRegion(buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME, name));
}
}
return builder.setServerName(ProtobufUtil.toServerName(server)).build();
} }
private static List<HBaseProtos.ServerName> toProtoServerNames(List<ServerName> servers) { private static List<HBaseProtos.ServerName> toProtoServerNames(List<ServerName> servers) {

View File

@ -602,25 +602,27 @@ message SecurityCapabilitiesResponse {
repeated Capability capabilities = 1; repeated Capability capabilities = 1;
} }
message ListDrainingRegionServersRequest { message ListDecommissionedRegionServersRequest {
} }
message ListDrainingRegionServersResponse { message ListDecommissionedRegionServersResponse {
repeated ServerName server_name = 1; repeated ServerName server_name = 1;
} }
message DrainRegionServersRequest { message DecommissionRegionServersRequest {
repeated ServerName server_name = 1; repeated ServerName server_name = 1;
required bool offload = 2;
} }
message DrainRegionServersResponse { message DecommissionRegionServersResponse {
} }
message RemoveDrainFromRegionServersRequest { message RecommissionRegionServerRequest {
repeated ServerName server_name = 1; required ServerName server_name = 1;
repeated RegionSpecifier region = 2;
} }
message RemoveDrainFromRegionServersResponse { message RecommissionRegionServerResponse {
} }
message ListDeadServersRequest { message ListDeadServersRequest {
@ -967,17 +969,17 @@ service MasterService {
rpc ListReplicationPeers(ListReplicationPeersRequest) rpc ListReplicationPeers(ListReplicationPeersRequest)
returns(ListReplicationPeersResponse); returns(ListReplicationPeersResponse);
/** Returns a list of ServerNames marked as draining. */ /** Returns a list of ServerNames marked as decommissioned. */
rpc listDrainingRegionServers(ListDrainingRegionServersRequest) rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDrainingRegionServersResponse); returns(ListDecommissionedRegionServersResponse);
/** Mark a list of ServerNames as draining. */ /** Decommission region servers. */
rpc drainRegionServers(DrainRegionServersRequest) rpc DecommissionRegionServers(DecommissionRegionServersRequest)
returns(DrainRegionServersResponse); returns(DecommissionRegionServersResponse);
/** Unmark a list of ServerNames marked as draining. */ /** Re-commission region server. */
rpc removeDrainFromRegionServers(RemoveDrainFromRegionServersRequest) rpc RecommissionRegionServer(RecommissionRegionServerRequest)
returns(RemoveDrainFromRegionServersResponse); returns(RecommissionRegionServerResponse);
/** Fetches the Master's view of space utilization */ /** Fetches the Master's view of space utilization */
rpc GetSpaceQuotaRegionSizes(GetSpaceQuotaRegionSizesRequest) rpc GetSpaceQuotaRegionSizes(GetSpaceQuotaRegionSizesRequest)

View File

@ -1461,4 +1461,40 @@ public interface MasterObserver {
*/ */
default void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx) default void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {} throws IOException {}
/**
* Called before decommission region servers.
*/
default void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, boolean offload) throws IOException {}
/**
* Called after decommission region servers.
*/
default void postDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, boolean offload) throws IOException {}
/**
* Called before list decommissioned region servers.
*/
default void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {}
/**
* Called after list decommissioned region servers.
*/
default void postListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {}
/**
* Called before recommission region server.
*/
default void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
ServerName server, List<byte[]> encodedRegionNames) throws IOException {}
/**
* Called after recommission region server.
*/
default void postRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
ServerName server, List<byte[]> encodedRegionNames) throws IOException {}
} }

View File

@ -98,6 +98,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@ -3435,53 +3436,97 @@ public class HMaster extends HRegionServer implements MasterServices {
return peers; return peers;
} }
@Override /**
public void drainRegionServer(final ServerName server) { * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
* @param servers Region servers to decommission.
* @throws HBaseIOException
*/
public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
throws HBaseIOException {
List<ServerName> serversAdded = new ArrayList<>(servers.size());
// Place the decommission marker first.
String parentZnode = getZooKeeper().znodePaths.drainingZNode; String parentZnode = getZooKeeper().znodePaths.drainingZNode;
for (ServerName server : servers) {
try { try {
String node = ZKUtil.joinZNode(parentZnode, server.getServerName()); String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
ZKUtil.createAndFailSilent(getZooKeeper(), node); ZKUtil.createAndFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) { } catch (KeeperException ke) {
LOG.warn(this.zooKeeper.prefix("Unable to add drain for '" + server.getServerName() + "'."), throw new HBaseIOException(
ke); this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
}
if (this.serverManager.addServerToDrainList(server)) {
serversAdded.add(server);
};
}
// Move the regions off the decommissioned servers.
if (offload) {
final List<ServerName> destServers = this.serverManager.createDestinationServersList();
for (ServerName server : serversAdded) {
final List<RegionInfo> regionsOnServer =
this.assignmentManager.getRegionStates().getServerRegionInfoSet(server);
for (RegionInfo hri : regionsOnServer) {
ServerName dest = balancer.randomAssignment(hri, destServers);
if (dest == null) {
throw new HBaseIOException("Unable to determine a plan to move " + hri);
}
RegionPlan rp = new RegionPlan(hri, server, dest);
this.assignmentManager.moveAsync(rp);
}
}
} }
} }
@Override /**
public List<ServerName> listDrainingRegionServers() { * List region servers marked as decommissioned (previously called 'draining') to not get regions
String parentZnode = getZooKeeper().znodePaths.drainingZNode; * assigned to them.
List<ServerName> serverNames = new ArrayList<>(); * @return List of decommissioned servers.
List<String> serverStrs = null; */
try { public List<ServerName> listDecommissionedRegionServers() {
serverStrs = ZKUtil.listChildrenNoWatch(getZooKeeper(), parentZnode); return this.serverManager.getDrainingServersList();
} catch (KeeperException ke) {
LOG.warn(this.zooKeeper.prefix("Unable to list draining servers."), ke);
}
// No nodes is empty draining list or ZK connectivity issues.
if (serverStrs == null) {
return serverNames;
} }
// Skip invalid ServerNames in result /**
for (String serverStr : serverStrs) { * Remove decommission marker (previously called 'draining') from a region server to allow regions
try { * assignments. Load regions onto the server asynchronously if a list of regions is given
serverNames.add(ServerName.parseServerName(serverStr)); * @param server Region server to remove decommission marker from.
} catch (IllegalArgumentException iae) { * @throws HBaseIOException
LOG.warn("Unable to cast '" + serverStr + "' to ServerName.", iae); */
} public void recommissionRegionServer(final ServerName server,
} final List<byte[]> encodedRegionNames) throws HBaseIOException {
return serverNames; // Remove the server from decommissioned (draining) server list.
}
@Override
public void removeDrainFromRegionServer(ServerName server) {
String parentZnode = getZooKeeper().znodePaths.drainingZNode; String parentZnode = getZooKeeper().znodePaths.drainingZNode;
String node = ZKUtil.joinZNode(parentZnode, server.getServerName()); String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
try { try {
ZKUtil.deleteNodeFailSilent(getZooKeeper(), node); ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) { } catch (KeeperException ke) {
LOG.warn( throw new HBaseIOException(
this.zooKeeper.prefix("Unable to remove drain for '" + server.getServerName() + "'."), ke); this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
}
this.serverManager.removeServerFromDrainList(server);
// Load the regions onto the server if we are given a list of regions.
if (encodedRegionNames == null || encodedRegionNames.isEmpty()) {
return;
}
if (!this.serverManager.isServerOnline(server)) {
return;
}
for (byte[] encodedRegionName : encodedRegionNames) {
RegionState regionState =
assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
if (regionState == null) {
LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
continue;
}
RegionInfo hri = regionState.getRegion();
if (server.equals(regionState.getServerName())) {
LOG.info("Skipping move of region " + hri.getRegionNameAsString()
+ " because region already assigned to the same server " + server + ".");
continue;
}
RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
this.assignmentManager.moveAsync(rp);
} }
} }

View File

@ -1684,4 +1684,60 @@ public class MasterCoprocessorHost
} }
}); });
} }
public void preDecommissionRegionServers(List<ServerName> servers, boolean offload) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preDecommissionRegionServers(this, servers, offload);
}
});
}
public void postDecommissionRegionServers(List<ServerName> servers, boolean offload) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postDecommissionRegionServers(this, servers, offload);
}
});
}
public void preListDecommissionedRegionServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preListDecommissionedRegionServers(this);
}
});
}
public void postListDecommissionedRegionServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postListDecommissionedRegionServers(this);
}
});
}
public void preRecommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preRecommissionRegionServer(this, server, encodedRegionNames);
}
});
}
public void postRecommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postRecommissionRegionServer(this, server, encodedRegionNames);
}
});
}
} }

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -119,6 +120,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
@ -129,8 +132,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
@ -177,8 +178,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrM
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDeadServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
@ -203,8 +204,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
@ -1902,15 +1903,21 @@ public class MasterRpcServices extends RSRpcServices
} }
@Override @Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller, public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
ListDrainingRegionServersRequest request) throws ServiceException { RpcController controller, ListDecommissionedRegionServersRequest request)
ListDrainingRegionServersResponse.Builder response = throws ServiceException {
ListDrainingRegionServersResponse.newBuilder(); ListDecommissionedRegionServersResponse.Builder response =
ListDecommissionedRegionServersResponse.newBuilder();
try { try {
master.checkInitialized(); master.checkInitialized();
List<ServerName> servers = master.listDrainingRegionServers(); if (master.cpHost != null) {
for (ServerName server : servers) { master.cpHost.preListDecommissionedRegionServers();
response.addServerName(ProtobufUtil.toServerName(server)); }
List<ServerName> servers = master.listDecommissionedRegionServers();
response.addAllServerName((servers.stream().map(server -> ProtobufUtil.toServerName(server)))
.collect(Collectors.toList()));
if (master.cpHost != null) {
master.cpHost.postListDecommissionedRegionServers();
} }
} catch (IOException io) { } catch (IOException io) {
throw new ServiceException(io); throw new ServiceException(io);
@ -1920,36 +1927,48 @@ public class MasterRpcServices extends RSRpcServices
} }
@Override @Override
public DrainRegionServersResponse drainRegionServers(RpcController controller, public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException { DecommissionRegionServersRequest request) throws ServiceException {
DrainRegionServersResponse.Builder response = DrainRegionServersResponse.newBuilder();
try { try {
master.checkInitialized(); master.checkInitialized();
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) { List<ServerName> servers = request.getServerNameList().stream()
master.drainRegionServer(ProtobufUtil.toServerName(pbServer)); .map(pbServer -> ProtobufUtil.toServerName(pbServer)).collect(Collectors.toList());
boolean offload = request.getOffload();
if (master.cpHost != null) {
master.cpHost.preDecommissionRegionServers(servers, offload);
}
master.decommissionRegionServers(servers, offload);
if (master.cpHost != null) {
master.cpHost.postDecommissionRegionServers(servers, offload);
} }
} catch (IOException io) { } catch (IOException io) {
throw new ServiceException(io); throw new ServiceException(io);
} }
return response.build(); return DecommissionRegionServersResponse.newBuilder().build();
} }
@Override @Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(RpcController controller, public RecommissionRegionServerResponse recommissionRegionServer(RpcController controller,
RemoveDrainFromRegionServersRequest request) throws ServiceException { RecommissionRegionServerRequest request) throws ServiceException {
RemoveDrainFromRegionServersResponse.Builder response =
RemoveDrainFromRegionServersResponse.newBuilder();
try { try {
master.checkInitialized(); master.checkInitialized();
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) { ServerName server = ProtobufUtil.toServerName(request.getServerName());
master.removeDrainFromRegionServer(ProtobufUtil.toServerName(pbServer)); List<byte[]> encodedRegionNames = request.getRegionList().stream()
.map(regionSpecifier -> regionSpecifier.getValue().toByteArray())
.collect(Collectors.toList());
if (master.cpHost != null) {
master.cpHost.preRecommissionRegionServer(server, encodedRegionNames);
}
master.recommissionRegionServer(server, encodedRegionNames);
if (master.cpHost != null) {
master.cpHost.postRecommissionRegionServer(server, encodedRegionNames);
} }
} catch (IOException io) { } catch (IOException io) {
throw new ServiceException(io); throw new ServiceException(io);
} }
return response.build(); return RecommissionRegionServerResponse.newBuilder().build();
} }
@Override @Override

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableDescriptors;
@ -475,24 +476,6 @@ public interface MasterServices extends Server {
List<ReplicationPeerDescription> listReplicationPeers(String regex) throws ReplicationException, List<ReplicationPeerDescription> listReplicationPeers(String regex) throws ReplicationException,
IOException; IOException;
/**
* Mark a region server as draining to prevent additional regions from getting assigned to it.
* @param server Region servers to drain.
*/
void drainRegionServer(final ServerName server);
/**
* List region servers marked as draining to not get additional regions assigned to them.
* @return List of draining servers.
*/
List<ServerName> listDrainingRegionServers();
/**
* Remove drain from a region server to allow additional regions assignments.
* @param server Region server to remove drain from.
*/
void removeDrainFromRegionServer(final ServerName server);
/** /**
* @return {@link LockManager} to lock namespaces/tables/regions. * @return {@link LockManager} to lock namespaces/tables/regions.
*/ */

View File

@ -42,6 +42,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.RegionLoad;
@ -92,6 +93,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* A server is fully processed only after the handler is fully enabled * A server is fully processed only after the handler is fully enabled
* and has completed the handling. * and has completed the handling.
*/ */
/**
*
*/
@InterfaceAudience.Private @InterfaceAudience.Private
public class ServerManager { public class ServerManager {
public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART = public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
@ -664,7 +668,7 @@ public class ServerManager {
/* /*
* Remove the server from the drain list. * Remove the server from the drain list.
*/ */
public boolean removeServerFromDrainList(final ServerName sn) { public synchronized boolean removeServerFromDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form: // Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode> // <hostname> , <port> , <startcode>
@ -676,10 +680,12 @@ public class ServerManager {
return this.drainingServers.remove(sn); return this.drainingServers.remove(sn);
} }
/* /**
* Add the server to the drain list. * Add the server to the drain list.
* @param sn
* @return True if the server is added or the server is already on the drain list.
*/ */
public boolean addServerToDrainList(final ServerName sn) { public synchronized boolean addServerToDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form: // Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode> // <hostname> , <port> , <startcode>
@ -693,7 +699,7 @@ public class ServerManager {
if (this.drainingServers.contains(sn)) { if (this.drainingServers.contains(sn)) {
LOG.warn("Server " + sn + " is already in the draining server list." + LOG.warn("Server " + sn + " is already in the draining server list." +
"Ignoring request to add it again."); "Ignoring request to add it again.");
return false; return true;
} }
LOG.info("Server " + sn + " added to draining server list."); LOG.info("Server " + sn + " added to draining server list.");
return this.drainingServers.add(sn); return this.drainingServers.add(sn);

View File

@ -1504,6 +1504,23 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
requirePermission(getActiveUser(ctx), "clearDeadServers", Action.ADMIN); requirePermission(getActiveUser(ctx), "clearDeadServers", Action.ADMIN);
} }
@Override
public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, boolean offload) throws IOException {
requirePermission(getActiveUser(ctx), "decommissionRegionServers", Action.ADMIN);
}
@Override
public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
requirePermission(getActiveUser(ctx), "listDecommissionedRegionServers", Action.ADMIN);
}
@Override
public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
ServerName server, List<byte[]> encodedRegionNames) throws IOException {
requirePermission(getActiveUser(ctx), "recommissionRegionServers", Action.ADMIN);
}
/* ---- RegionObserver implementation ---- */ /* ---- RegionObserver implementation ---- */
@Override @Override

View File

@ -44,6 +44,9 @@ import org.apache.zookeeper.KeeperException;
* <p>If an RS gets added to the draining list, we add a watcher to it and call * <p>If an RS gets added to the draining list, we add a watcher to it and call
* {@link ServerManager#addServerToDrainList(ServerName)} * {@link ServerManager#addServerToDrainList(ServerName)}
* *
* <p>This class is deprecated in 2.0 because decommission/draining API goes through
* master in 2.0. Can remove this class in 3.0.
*
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DrainingServerTracker extends ZooKeeperListener { public class DrainingServerTracker extends ZooKeeperListener {

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -34,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -695,65 +697,62 @@ public class TestAdmin2 {
assertTrue(lockList.startsWith("[")); assertTrue(lockList.startsWith("["));
} }
/* @Test(timeout = 30000)
* This test drains all regions so cannot be run in parallel with other tests. public void testDecommissionRegionServers() throws Exception {
*/ List<ServerName> decommissionedRegionServers = admin.listDecommissionedRegionServers();
@Ignore @Test(timeout = 30000) assertTrue(decommissionedRegionServers.isEmpty());
public void testDrainRegionServers() throws Exception {
List<ServerName> drainingServers = admin.listDrainingRegionServers();
assertTrue(drainingServers.isEmpty());
// Drain all region servers. final TableName tableName = TableName.valueOf(name.getMethodName());
Collection<ServerName> clusterServers = TEST_UTIL.createMultiRegionTable(tableName, "f".getBytes(), 6);
admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers();
drainingServers = new ArrayList<>();
for (ServerName server : clusterServers) {
drainingServers.add(server);
}
admin.drainRegionServers(drainingServers);
// Check that drain lists all region servers. ArrayList<ServerName> clusterRegionServers =
drainingServers = admin.listDrainingRegionServers(); new ArrayList<>(admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers());
assertEquals(clusterServers.size(), drainingServers.size());
for (ServerName server : clusterServers) {
assertTrue(drainingServers.contains(server));
}
// Try for 20 seconds to create table (new region). Will not complete because all RSs draining. assertEquals(clusterRegionServers.size(), 3);
final TableName hTable = TableName.valueOf(name.getMethodName());
final HTableDescriptor htd = new HTableDescriptor(hTable);
htd.addFamily(new HColumnDescriptor("cf"));
final Runnable createTable = new Thread() { HashMap<ServerName, List<RegionInfo>> serversToDecommssion = new HashMap<>();
@Override // Get a server that has regions. We will decommission two of the servers,
public void run() { // leaving one online.
try { int i;
admin.createTable(htd); for (i = 0; i < clusterRegionServers.size(); i++) {
} catch (IOException ioe) { List<RegionInfo> regionsOnServer = admin.getRegions(clusterRegionServers.get(i));
assertTrue(false); // Should not get IOException. if (regionsOnServer.size() > 0) {
serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer);
break;
} }
} }
};
final ExecutorService executor = Executors.newSingleThreadExecutor(); clusterRegionServers.remove(i);
final java.util.concurrent.Future<?> future = executor.submit(createTable); // Get another server to decommission.
executor.shutdown(); serversToDecommssion.put(clusterRegionServers.get(0),
try { admin.getRegions(clusterRegionServers.get(0)));
future.get(20, TimeUnit.SECONDS);
} catch (TimeoutException ie) { ServerName remainingServer = clusterRegionServers.get(1);
assertTrue(true); // Expecting timeout to happen.
// Decommission
admin.decommissionRegionServers(new ArrayList<ServerName>(serversToDecommssion.keySet()), true);
assertEquals(2, admin.listDecommissionedRegionServers().size());
// Verify the regions have been off the decommissioned servers, all on the one
// remaining server.
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, remainingServer, 10000);
}
} }
// Kill executor if still processing. // Recommission and load the regions.
if (!executor.isTerminated()) { for (ServerName server : serversToDecommssion.keySet()) {
executor.shutdownNow(); List<byte[]> encodedRegionNames = serversToDecommssion.get(server).stream()
assertTrue(true); .map(region -> region.getEncodedNameAsBytes()).collect(Collectors.toList());
admin.recommissionRegionServer(server, encodedRegionNames);
}
assertTrue(admin.listDecommissionedRegionServers().isEmpty());
// Verify the regions have been moved to the recommissioned servers
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, server, 10000);
}
} }
// Remove drain list.
admin.removeDrainFromRegionServers(drainingServers);
drainingServers = admin.listDrainingRegionServers();
assertTrue(drainingServers.isEmpty());
} }
} }

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ClusterStatus.Option;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncDecommissionAdminApi extends TestAsyncAdminBase {
@Test(timeout = 30000)
public void testAsyncDecommissionRegionServers() throws Exception {
List<ServerName> decommissionedRegionServers = admin.listDecommissionedRegionServers().get();
assertTrue(decommissionedRegionServers.isEmpty());
TEST_UTIL.createMultiRegionTable(tableName, FAMILY, 4);
ArrayList<ServerName> clusterRegionServers =
new ArrayList<>(admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).get().getServers());
assertEquals(clusterRegionServers.size(), 2);
HashMap<ServerName, List<RegionInfo>> serversToDecommssion = new HashMap<>();
// Get a server that has regions. We will decommission one of the servers,
// leaving one online.
int i;
for (i = 0; i < clusterRegionServers.size(); i++) {
List<RegionInfo> regionsOnServer = admin.getOnlineRegions(clusterRegionServers.get(i)).get();
if (regionsOnServer.size() > 0) {
serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer);
break;
}
}
clusterRegionServers.remove(i);
ServerName remainingServer = clusterRegionServers.get(0);
// Decommission
admin.decommissionRegionServers(new ArrayList<ServerName>(serversToDecommssion.keySet()),
true).get();
assertEquals(1, admin.listDecommissionedRegionServers().get().size());
// Verify the regions have been off the decommissioned servers, all on the remaining server.
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, remainingServer, 10000);
}
}
// Recommission and load regions
for (ServerName server : serversToDecommssion.keySet()) {
List<byte[]> encodedRegionNames = serversToDecommssion.get(server).stream()
.map(region -> region.getEncodedNameAsBytes()).collect(Collectors.toList());
admin.recommissionRegionServer(server, encodedRegionNames).get();
}
assertTrue(admin.listDecommissionedRegionServers().get().isEmpty());
// Verify the regions have been moved to the recommissioned servers
for (ServerName server : serversToDecommssion.keySet()) {
for (RegionInfo region : serversToDecommssion.get(server)) {
TEST_UTIL.assertRegionOnServer(region, server, 10000);
}
}
}
}

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncDrainAdminApi extends TestAsyncAdminBase {
/*
* This test drains all regions so cannot be run in parallel with other tests.
*/
@Ignore @Test(timeout = 30000)
public void testDrainRegionServers() throws Exception {
List<ServerName> drainingServers = admin.listDrainingRegionServers().get();
assertTrue(drainingServers.isEmpty());
// Drain all region servers.
Collection<ServerName> clusterServers = admin.getRegionServers().get();
drainingServers = new ArrayList<>();
for (ServerName server : clusterServers) {
drainingServers.add(server);
}
admin.drainRegionServers(drainingServers).join();
// Check that drain lists all region servers.
drainingServers = admin.listDrainingRegionServers().get();
assertEquals(clusterServers.size(), drainingServers.size());
for (ServerName server : clusterServers) {
assertTrue(drainingServers.contains(server));
}
// Try for 20 seconds to create table (new region). Will not complete because all RSs draining.
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
final Runnable createTable = new Thread() {
@Override
public void run() {
try {
admin.createTable(builder.build()).join();
} catch (Exception ioe) {
assertTrue(false); // Should not get IOException.
}
}
};
final ExecutorService executor = Executors.newSingleThreadExecutor();
final java.util.concurrent.Future<?> future = executor.submit(createTable);
executor.shutdown();
try {
future.get(20, TimeUnit.SECONDS);
} catch (TimeoutException ie) {
assertTrue(true); // Expecting timeout to happen.
}
// Kill executor if still processing.
if (!executor.isTerminated()) {
executor.shutdownNow();
assertTrue(true);
}
// Remove drain list.
admin.removeDrainFromRegionServers(drainingServers);
drainingServers = admin.listDrainingRegionServers().get();
assertTrue(drainingServers.isEmpty());
}
}

View File

@ -408,21 +408,6 @@ public class MockNoopMasterServices implements MasterServices, Server {
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException { public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
} }
@Override
public void drainRegionServer(ServerName server) {
return;
}
@Override
public List<ServerName> listDrainingRegionServers() {
return null;
}
@Override
public void removeDrainFromRegionServer(ServerName servers) {
return;
}
@Override @Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException, public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException,
IOException { IOException {

View File

@ -325,19 +325,19 @@ public class TestZooKeeperACL {
if (!secureZKAvailable) { if (!secureZKAvailable) {
return; return;
} }
List<ServerName> drainingServers = new ArrayList<>(1); List<ServerName> decommissionedServers = new ArrayList<>(1);
drainingServers.add(ServerName.parseServerName("ZZZ,123,123")); decommissionedServers.add(ServerName.parseServerName("ZZZ,123,123"));
// If unable to connect to secure ZK cluster then this operation would fail. // If unable to connect to secure ZK cluster then this operation would fail.
TEST_UTIL.getAdmin().drainRegionServers(drainingServers); TEST_UTIL.getAdmin().decommissionRegionServers(decommissionedServers, false);
drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers(); decommissionedServers = TEST_UTIL.getAdmin().listDecommissionedRegionServers();
assertEquals(1, drainingServers.size()); assertEquals(1, decommissionedServers.size());
assertEquals(ServerName.parseServerName("ZZZ,123,123"), drainingServers.get(0)); assertEquals(ServerName.parseServerName("ZZZ,123,123"), decommissionedServers.get(0));
TEST_UTIL.getAdmin().removeDrainFromRegionServers(drainingServers); TEST_UTIL.getAdmin().recommissionRegionServer(decommissionedServers.get(0), null);
drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers(); decommissionedServers = TEST_UTIL.getAdmin().listDecommissionedRegionServers();
assertEquals(0, drainingServers.size()); assertEquals(0, decommissionedServers.size());
} }
} }