HBASE-23648: Re-use underlying connection registry in RawAsyncHBaseAdmin (#994)
* HBASE-23648: Re-use underlying connection registry in RawAsyncHBaseAdmin No need to create and close a new registry on demand. Audited other usages of getRegistry() and the code looks fine. * Fix checkstyle issues in RawAsyncHBaseAdmin
This commit is contained in:
parent
0a1c3b2055
commit
7162c02c0d
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
|
|||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import java.io.IOException;
|
||||
|
@ -46,7 +45,6 @@ import java.util.function.Supplier;
|
|||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||
|
@ -99,14 +97,12 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
|
||||
|
@ -755,7 +751,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
|
||||
public CompletableFuture<Void> addColumnFamily(
|
||||
TableName tableName, ColumnFamilyDescriptor columnFamily) {
|
||||
return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
|
||||
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
|
||||
ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
|
||||
|
@ -809,10 +806,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
.<NamespaceDescriptor> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
|
||||
controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
|
||||
req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
|
||||
.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
|
||||
.<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
|
||||
call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
|
||||
(s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
|
||||
-> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -830,13 +827,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
@Override
|
||||
public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
|
||||
return this
|
||||
.<List<NamespaceDescriptor>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
|
||||
controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
|
||||
done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
|
||||
.toNamespaceDescriptorList(resp))).call();
|
||||
.<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
|
||||
.<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
|
||||
List<NamespaceDescriptor>> call(controller, stub,
|
||||
ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
|
||||
s.listNamespaceDescriptors(c, req, done),
|
||||
(resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1080,10 +1076,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
|
||||
if (TableName.META_TABLE_NAME.equals(tableName)) {
|
||||
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
|
||||
// For meta table, we use zk to fetch all locations.
|
||||
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(
|
||||
connection.getConfiguration());
|
||||
addListener(registry.getMetaRegionLocations(), (metaRegions, err) -> {
|
||||
addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
} else if (metaRegions == null || metaRegions.isEmpty() ||
|
||||
|
@ -1092,8 +1085,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
} else {
|
||||
future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
|
||||
}
|
||||
// close the registry.
|
||||
IOUtils.closeQuietly(registry);
|
||||
});
|
||||
return future;
|
||||
} else {
|
||||
|
@ -1689,11 +1680,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
@Override
|
||||
public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
|
||||
return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
|
||||
.<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
|
||||
controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
|
||||
(s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
|
||||
(resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
|
||||
.call();
|
||||
.<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
|
||||
call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
|
||||
(s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
|
||||
(resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1710,13 +1700,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
@Override
|
||||
public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
|
||||
SyncReplicationState clusterState) {
|
||||
return this
|
||||
.<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
|
||||
return this.<TransitReplicationPeerSyncReplicationStateRequest,
|
||||
TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
|
||||
RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
|
||||
clusterState),
|
||||
(s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
|
||||
(resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
|
||||
() -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
|
||||
(s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
|
||||
(resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
|
||||
() -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1786,11 +1776,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return this
|
||||
.<List<ReplicationPeerDescription>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
|
||||
controller,
|
||||
stub,
|
||||
request,
|
||||
(controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
|
||||
List<ReplicationPeerDescription>> call(controller, stub, request,
|
||||
(s, c, req, done) -> s.listReplicationPeers(c, req, done),
|
||||
(resp) -> resp.getPeerDescList().stream()
|
||||
.map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
|
||||
|
@ -2299,11 +2286,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
|
||||
public CompletableFuture<Void> decommissionRegionServers(
|
||||
List<ServerName> servers, boolean offload) {
|
||||
return this.<Void> newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
|
||||
controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
|
||||
controller, stub,
|
||||
RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
|
||||
(s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
|
||||
.call();
|
||||
}
|
||||
|
@ -2325,11 +2314,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
public CompletableFuture<Void> recommissionRegionServer(ServerName server,
|
||||
List<byte[]> encodedRegionNames) {
|
||||
return this.<Void> newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
|
||||
stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
|
||||
(s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
|
||||
.call();
|
||||
.action((controller, stub) ->
|
||||
this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
|
||||
controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
|
||||
server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
|
||||
c, req, done), resp -> null)).call();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2395,7 +2384,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
* Get the region info for the passed region name. The region name may be a full region name or
|
||||
* encoded region name. If the region does not found, then it'll throw an UnknownRegionException
|
||||
* wrapped by a {@link CompletableFuture}
|
||||
* @param regionNameOrEncodedRegionName
|
||||
* @return region info, wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
|
||||
|
@ -2886,10 +2874,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
.<List<SecurityCapability>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
|
||||
controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
|
||||
done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
|
||||
.toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
|
||||
.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
|
||||
call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
|
||||
(s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
|
||||
(resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3066,14 +3055,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
MajorCompactionTimestampRequest request =
|
||||
MajorCompactionTimestampRequest.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
|
||||
return this
|
||||
.<Optional<Long>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
|
||||
controller, stub, request,
|
||||
(s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
|
||||
ProtobufUtil::toOptionalTimestamp)).call();
|
||||
return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
|
||||
this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
|
||||
call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
|
||||
c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3213,11 +3198,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
public CompletableFuture<Boolean> isBalancerEnabled() {
|
||||
return this
|
||||
.<Boolean> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
|
||||
controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
|
||||
(s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
|
||||
.call();
|
||||
.action((controller, stub) ->
|
||||
this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
|
||||
stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
|
||||
-> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue