diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 3e5bea320da..69bd6110f25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; - import com.google.protobuf.Message; import com.google.protobuf.RpcChannel; import java.io.IOException; @@ -46,7 +45,6 @@ import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -99,14 +97,12 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; @@ -755,7 +751,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { + public CompletableFuture addColumnFamily( + TableName tableName, ColumnFamilyDescriptor columnFamily) { return this. procedureCall(tableName, RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), @@ -809,10 +806,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { . newMasterCaller() .action( (controller, stub) -> this - . call( - controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c, - req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil - .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); + . + call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), + (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) + -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); } @Override @@ -830,13 +827,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture> listNamespaceDescriptors() { return this - .> newMasterCaller() - .action( - (controller, stub) -> this - .> call( - controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, - done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil - .toNamespaceDescriptorList(resp))).call(); + .> newMasterCaller().action((controller, stub) -> this + .> call(controller, stub, + ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) -> + s.listNamespaceDescriptors(c, req, done), + (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call(); } @Override @@ -1080,10 +1076,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture> getTableHRegionLocations(TableName tableName) { if (TableName.META_TABLE_NAME.equals(tableName)) { CompletableFuture> future = new CompletableFuture<>(); - // For meta table, we use zk to fetch all locations. - ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry( - connection.getConfiguration()); - addListener(registry.getMetaRegionLocations(), (metaRegions, err) -> { + addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { if (err != null) { future.completeExceptionally(err); } else if (metaRegions == null || metaRegions.isEmpty() || @@ -1092,8 +1085,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } else { future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); } - // close the registry. - IOUtils.closeQuietly(registry); }); return future; } else { @@ -1689,11 +1680,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture getReplicationPeerConfig(String peerId) { return this. newMasterCaller().action((controller, stub) -> this - . call( - controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), - (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), - (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) - .call(); + . + call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), + (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), + (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call(); } @Override @@ -1710,13 +1700,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState clusterState) { - return this - . procedureCall( + return this. procedureCall( RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, clusterState), - (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), - (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, - () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); + (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), + (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, + () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); } @Override @@ -1786,11 +1776,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return this .> newMasterCaller() .action( - (controller, stub) -> this - .> call( - controller, - stub, - request, + (controller, stub) -> this.> call(controller, stub, request, (s, c, req, done) -> s.listReplicationPeers(c, req, done), (resp) -> resp.getPeerDescList().stream() .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) @@ -2299,11 +2286,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture decommissionRegionServers(List servers, boolean offload) { + public CompletableFuture decommissionRegionServers( + List servers, boolean offload) { return this. newMasterCaller() .action((controller, stub) -> this . call( - controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload), + controller, stub, + RequestConverter.buildDecommissionRegionServersRequest(servers, offload), (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) .call(); } @@ -2325,11 +2314,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture recommissionRegionServer(ServerName server, List encodedRegionNames) { return this. newMasterCaller() - .action((controller, stub) -> this - . call(controller, - stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), - (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) - .call(); + .action((controller, stub) -> + this. call( + controller, stub, RequestConverter.buildRecommissionRegionServerRequest( + server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer( + c, req, done), resp -> null)).call(); } /** @@ -2395,7 +2384,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { * Get the region info for the passed region name. The region name may be a full region name or * encoded region name. If the region does not found, then it'll throw an UnknownRegionException * wrapped by a {@link CompletableFuture} - * @param regionNameOrEncodedRegionName * @return region info, wrapped by a {@link CompletableFuture} */ private CompletableFuture getRegionInfo(byte[] regionNameOrEncodedRegionName) { @@ -2886,10 +2874,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .> newMasterCaller() .action( (controller, stub) -> this - .> call( - controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req, - done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil - .toSecurityCapabilityList(resp.getCapabilitiesList()))).call(); + .> + call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), + (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), + (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) + .call(); } @Override @@ -3066,14 +3055,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - return this - .> newMasterCaller() - .action( - (controller, stub) -> this - .> call( - controller, stub, request, - (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), - ProtobufUtil::toOptionalTimestamp)).call(); + return this.> newMasterCaller().action((controller, stub) -> + this.> + call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp( + c, req, done), ProtobufUtil::toOptionalTimestamp)).call(); } @Override @@ -3213,11 +3198,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture isBalancerEnabled() { return this . newMasterCaller() - .action( - (controller, stub) -> this. call( - controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), - (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) - .call(); + .action((controller, stub) -> + this. call(controller, + stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done) + -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call(); } @Override