HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin

This commit is contained in:
zhangduo 2018-12-06 21:25:34 +08:00
parent 2446f0026b
commit 5d872d3422
7 changed files with 244 additions and 87 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.yetus.audience.InterfaceAudience;
@ -26,6 +27,11 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface AsyncClusterConnection extends AsyncConnection {
/**
* Get the admin service for the given region server.
*/
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
/**
* Get the nonce generator for this connection.
*/

View File

@ -379,4 +379,9 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
Optional<MetricsConnection> getConnectionMetrics() {
return metrics;
}
@Override
public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
/**
* A simple wrapper of the {@link AdminService} for a region server, which returns a
* {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
* need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
* get it from the {@link RpcController} passed in.
* <p/>
* Notice that there is no retry, and this is intentional. We have different retry for different
* usage for now, if later we want to unify them, we can move the retry logic into this class.
*/
@InterfaceAudience.Private
public class AsyncRegionServerAdmin {
private final ServerName server;
private final AsyncConnectionImpl conn;
AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) {
this.server = server;
this.conn = conn;
}
@FunctionalInterface
private interface RpcCall<RESP> {
void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
}
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
CompletableFuture<RESP> future = new CompletableFuture<>();
HBaseRpcController controller = conn.rpcControllerFactory.newController();
try {
rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
@Override
public void run(RESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
}
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
}
public CompletableFuture<GetStoreFileResponse> getStoreFile(GetStoreFileRequest request) {
return call((stub, controller, done) -> stub.getStoreFile(controller, request, done));
}
public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
GetOnlineRegionRequest request) {
return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
}
public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
return call((stub, controller, done) -> stub.openRegion(controller, request, done));
}
public CompletableFuture<WarmupRegionResponse> warmupRegion(WarmupRegionRequest request) {
return call((stub, controller, done) -> stub.warmupRegion(controller, request, done));
}
public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest request) {
return call((stub, controller, done) -> stub.closeRegion(controller, request, done));
}
public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest request) {
return call((stub, controller, done) -> stub.flushRegion(controller, request, done));
}
public CompletableFuture<CompactionSwitchResponse> compactionSwitch(
CompactionSwitchRequest request) {
return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done));
}
public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionRequest request) {
return call((stub, controller, done) -> stub.compactRegion(controller, request, done));
}
public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
ReplicateWALEntryRequest request) {
return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
}
public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
return call((stub, controller, done) -> stub.replay(controller, request, done));
}
public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done));
}
public CompletableFuture<GetServerInfoResponse> getServerInfo(GetServerInfoRequest request) {
return call((stub, controller, done) -> stub.getServerInfo(controller, request, done));
}
public CompletableFuture<StopServerResponse> stopServer(StopServerRequest request) {
return call((stub, controller, done) -> stub.stopServer(controller, request, done));
}
public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes(
UpdateFavoredNodesRequest request) {
return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done));
}
public CompletableFuture<UpdateConfigurationResponse> updateConfiguration(
UpdateConfigurationRequest request) {
return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done));
}
public CompletableFuture<GetRegionLoadResponse> getRegionLoad(GetRegionLoadRequest request) {
return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done));
}
public CompletableFuture<ClearCompactionQueuesResponse> clearCompactionQueues(
ClearCompactionQueuesRequest request) {
return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done));
}
public CompletableFuture<ClearRegionBlockCacheResponse> clearRegionBlockCache(
ClearRegionBlockCacheRequest request) {
return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done));
}
public CompletableFuture<GetSpaceQuotaSnapshotsResponse> getSpaceQuotaSnapshots(
GetSpaceQuotaSnapshotsRequest request) {
return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done));
}
public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
ExecuteProceduresRequest request) {
return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
}
}

View File

@ -193,4 +193,4 @@ public final class FutureUtils {
future.completeExceptionally(e);
return future;
}
}
}

View File

@ -202,6 +202,7 @@ import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
@ -234,6 +235,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@ -1961,6 +1963,15 @@ public class HMaster extends HRegionServer implements MasterServices {
});
}
private void warmUpRegion(ServerName server, RegionInfo region) {
FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
.warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
if (e != null) {
LOG.warn("Failed to warm up region {} on server {}", region, server, e);
}
});
}
// Public so can be accessed by tests. Blocks until move is done.
// Replace with an async implementation from which you can get
// a success/failure result.
@ -2031,11 +2042,12 @@ public class HMaster extends HRegionServer implements MasterServices {
}
TransitRegionStateProcedure proc =
this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
// Warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
serverManager.sendRegionWarmup(rp.getDestination(), hri);
this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
// Warmup the region on the destination before initiating the move.
// A region server could reject the close request because it either does not
// have the specified region or the region is being split.
warmUpRegion(rp.getDestination(), hri);
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
try {

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -159,25 +155,16 @@ public class ServerManager {
private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
new ConcurrentSkipListMap<>();
/**
* Map of admin interfaces per registered regionserver; these interfaces we use to control
* regionservers out on the cluster
*/
private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
/** List of region servers that should not get any more new regions. */
private final ArrayList<ServerName> drainingServers = new ArrayList<>();
private final MasterServices master;
private final ClusterConnection connection;
private final DeadServer deadservers = new DeadServer();
private final long maxSkew;
private final long warningSkew;
private final RpcControllerFactory rpcControllerFactory;
/** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
@ -189,8 +176,6 @@ public class ServerManager {
Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
this.connection = master.getClusterConnection();
this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
}
@ -438,7 +423,6 @@ public class ServerManager {
void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
LOG.info("Registering regionserver=" + serverName);
this.onlineServers.put(serverName, sl);
this.rsAdmins.remove(serverName);
}
@VisibleForTesting
@ -634,7 +618,6 @@ public class ServerManager {
this.onlineServers.remove(sn);
onlineServers.notifyAll();
}
this.rsAdmins.remove(sn);
}
/*
@ -677,34 +660,6 @@ public class ServerManager {
return this.drainingServers.add(sn);
}
// RPC methods to region servers
private HBaseRpcController newRpcController() {
return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
}
/**
* Sends a WARMUP RPC to the specified server to warmup the specified region.
* <p>
* A region server could reject the close request because it either does not
* have the specified region or the region is being split.
* @param server server to warmup a region
* @param region region to warmup
*/
public void sendRegionWarmup(ServerName server,
RegionInfo region) {
if (server == null) return;
try {
AdminService.BlockingInterface admin = getRsAdmin(server);
HBaseRpcController controller = newRpcController();
ProtobufUtil.warmupRegion(controller, admin, region);
} catch (IOException e) {
LOG.error("Received exception in RPC for warmup server:" +
server + "region: " + region +
"exception: " + e);
}
}
/**
* Contacts a region server and waits up to timeout ms
* to close the region. This bypasses the active hmaster.
@ -737,28 +692,6 @@ public class ServerManager {
+ " timeout " + timeout);
}
/**
* @param sn
* @return Admin interface for the remote regionserver named <code>sn</code>
* @throws IOException
* @throws RetriesExhaustedException wrapping a ConnectException if failed
*/
public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
throws IOException {
AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
if (admin == null) {
LOG.debug("New admin connection to " + sn.toString());
if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
// A master is also a region server now, see HBASE-10569 for details
admin = ((HRegionServer)master).getRSRpcServices();
} else {
admin = this.connection.getAdmin(sn);
}
this.rsAdmins.put(sn, admin);
}
return admin;
}
/**
* Calculate min necessary to start. This is not an absolute. It is just
* a friction that will cause us hang around a bit longer waiting on

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices;
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -41,11 +43,9 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
@ -219,13 +219,8 @@ public class RSProcedureDispatcher
this.remoteProcedures = remoteProcedures;
}
private AdminService.BlockingInterface getRsAdmin() throws IOException {
final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
if (admin == null) {
throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
" failed because no RPC connection found to this server");
}
return admin;
private AsyncRegionServerAdmin getRsAdmin() throws IOException {
return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
}
protected final ServerName getServerName() {
@ -345,11 +340,7 @@ public class RSProcedureDispatcher
@VisibleForTesting
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
try {
return getRsAdmin().executeProcedures(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return FutureUtils.get(getRsAdmin().executeProcedures(request));
}
protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {