HBASE-23648: Re-use underlying connection registry in RawAsyncHBaseAdmin (#994)
* HBASE-23648: Re-use underlying connection registry in RawAsyncHBaseAdmin
No need to create and close a new registry on demand. Audited other
usages of getRegistry() and the code looks fine.
* Fix checkstyle issues in RawAsyncHBaseAdmin
(cherry picked from commit 07c38260f5
)
This commit is contained in:
parent
c650f28ab4
commit
1b52501b1b
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
|
|||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import java.io.IOException;
|
||||
|
@ -46,7 +45,6 @@ import java.util.function.Supplier;
|
|||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||
|
@ -98,14 +96,12 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
|
||||
|
@ -784,7 +780,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
|
||||
public CompletableFuture<Void> addColumnFamily(
|
||||
TableName tableName, ColumnFamilyDescriptor columnFamily) {
|
||||
return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
|
||||
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
|
||||
ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
|
||||
|
@ -838,10 +835,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
.<NamespaceDescriptor> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
|
||||
controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
|
||||
req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
|
||||
.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
|
||||
.<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
|
||||
call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
|
||||
(s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
|
||||
-> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -859,13 +856,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
@Override
|
||||
public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
|
||||
return this
|
||||
.<List<NamespaceDescriptor>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
|
||||
controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
|
||||
done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
|
||||
.toNamespaceDescriptorList(resp))).call();
|
||||
.<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
|
||||
.<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
|
||||
List<NamespaceDescriptor>> call(controller, stub,
|
||||
ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
|
||||
s.listNamespaceDescriptors(c, req, done),
|
||||
(resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1097,10 +1093,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
|
||||
if (TableName.META_TABLE_NAME.equals(tableName)) {
|
||||
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
|
||||
// For meta table, we use zk to fetch all locations.
|
||||
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(
|
||||
connection.getConfiguration());
|
||||
addListener(registry.getMetaRegionLocations(), (metaRegions, err) -> {
|
||||
addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
} else if (metaRegions == null || metaRegions.isEmpty() ||
|
||||
|
@ -1109,8 +1102,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
} else {
|
||||
future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
|
||||
}
|
||||
// close the registry.
|
||||
IOUtils.closeQuietly(registry);
|
||||
});
|
||||
return future;
|
||||
} else {
|
||||
|
@ -1793,11 +1784,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return this
|
||||
.<List<ReplicationPeerDescription>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
|
||||
controller,
|
||||
stub,
|
||||
request,
|
||||
(controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
|
||||
List<ReplicationPeerDescription>> call(controller, stub, request,
|
||||
(s, c, req, done) -> s.listReplicationPeers(c, req, done),
|
||||
(resp) -> resp.getPeerDescList().stream()
|
||||
.map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
|
||||
|
@ -2308,11 +2296,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
|
||||
public CompletableFuture<Void> decommissionRegionServers(
|
||||
List<ServerName> servers, boolean offload) {
|
||||
return this.<Void> newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
|
||||
controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
|
||||
controller, stub,
|
||||
RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
|
||||
(s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
|
||||
.call();
|
||||
}
|
||||
|
@ -2334,11 +2324,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
public CompletableFuture<Void> recommissionRegionServer(ServerName server,
|
||||
List<byte[]> encodedRegionNames) {
|
||||
return this.<Void> newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
|
||||
stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
|
||||
(s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
|
||||
.call();
|
||||
.action((controller, stub) ->
|
||||
this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
|
||||
controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
|
||||
server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
|
||||
c, req, done), resp -> null)).call();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2404,7 +2394,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
* Get the region info for the passed region name. The region name may be a full region name or
|
||||
* encoded region name. If the region does not found, then it'll throw an UnknownRegionException
|
||||
* wrapped by a {@link CompletableFuture}
|
||||
* @param regionNameOrEncodedRegionName
|
||||
* @return region info, wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
|
||||
|
@ -2895,10 +2884,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
.<List<SecurityCapability>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
|
||||
controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
|
||||
done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
|
||||
.toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
|
||||
.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
|
||||
call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
|
||||
(s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
|
||||
(resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3075,14 +3065,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
MajorCompactionTimestampRequest request =
|
||||
MajorCompactionTimestampRequest.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
|
||||
return this
|
||||
.<Optional<Long>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
|
||||
controller, stub, request,
|
||||
(s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
|
||||
ProtobufUtil::toOptionalTimestamp)).call();
|
||||
return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
|
||||
this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
|
||||
call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
|
||||
c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3222,11 +3208,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
public CompletableFuture<Boolean> isBalancerEnabled() {
|
||||
return this
|
||||
.<Boolean> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
|
||||
controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
|
||||
(s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
|
||||
.call();
|
||||
.action((controller, stub) ->
|
||||
this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
|
||||
stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
|
||||
-> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue