HBASE-25454 Add trace support for connection registry (#2828)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2021-01-01 22:55:03 +08:00 committed by Duo Zhang
parent 805b2ae2ad
commit dcb78bd4bd
2 changed files with 58 additions and 41 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY; import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.DNS.getHostname; import static org.apache.hadoop.hbase.util.DNS.getHostname;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
@ -266,18 +268,23 @@ public class MasterRegistry implements ConnectionRegistry {
@Override @Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() { public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c, return tracedFuture(
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0, () -> this
"getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations); .<GetMetaRegionLocationsResponse> call(
(c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d),
r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
.thenApply(MasterRegistry::transformMetaRegionLocations),
"MasterRegistry.getMetaRegionLocations");
} }
@Override @Override
public CompletableFuture<String> getClusterId() { public CompletableFuture<String> getClusterId() {
return this return tracedFuture(() -> this
.<GetClusterIdResponse> call( .<GetClusterIdResponse> call(
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d), (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
GetClusterIdResponse::hasClusterId, "getClusterId()") GetClusterIdResponse::hasClusterId, "getClusterId()")
.thenApply(GetClusterIdResponse::getClusterId); .thenApply(GetClusterIdResponse::getClusterId), "MasterRegistry.getClusterId");
} }
private static boolean hasActiveMaster(GetMastersResponse resp) { private static boolean hasActiveMaster(GetMastersResponse resp) {
@ -300,21 +307,23 @@ public class MasterRegistry implements ConnectionRegistry {
@Override @Override
public CompletableFuture<ServerName> getActiveMaster() { public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>(); return tracedFuture(() -> {
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d), CompletableFuture<ServerName> future = new CompletableFuture<>();
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> { addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
if (ex != null) { MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
future.completeExceptionally(ex); if (ex != null) {
} future.completeExceptionally(ex);
ServerName result = null; }
try { ServerName result = null;
result = filterActiveMaster((GetMastersResponse)resp); try {
} catch (IOException e) { result = filterActiveMaster((GetMastersResponse) resp);
future.completeExceptionally(e); } catch (IOException e) {
} future.completeExceptionally(e);
future.complete(result); }
}); future.complete(result);
return future; });
return future;
}, "MasterRegistry.getActiveMaster");
} }
private static List<ServerName> transformServerNames(GetMastersResponse resp) { private static List<ServerName> transformServerNames(GetMastersResponse resp) {
@ -335,11 +344,13 @@ public class MasterRegistry implements ConnectionRegistry {
@Override @Override
public void close() { public void close() {
if (masterAddressRefresher != null) { trace(() -> {
masterAddressRefresher.close(); if (masterAddressRefresher != null) {
} masterAddressRefresher.close();
if (rpcClient != null) { }
rpcClient.close(); if (rpcClient != null) {
} rpcClient.close();
}
}, "MasterRegistry.close");
} }
} }

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGION
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
@ -94,7 +95,9 @@ class ZKConnectionRegistry implements ConnectionRegistry {
@Override @Override
public CompletableFuture<String> getClusterId() { public CompletableFuture<String> getClusterId() {
return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId); return tracedFuture(
() -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
"ZKConnectionRegistry.getClusterId");
} }
ReadOnlyZKClient getZKClient() { ReadOnlyZKClient getZKClient() {
@ -192,19 +195,20 @@ class ZKConnectionRegistry implements ConnectionRegistry {
@Override @Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() { public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>(); return tracedFuture(() -> {
addListener( CompletableFuture<RegionLocations> future = new CompletableFuture<>();
zk.list(znodePaths.baseZNode) addListener(
.thenApply(children -> children.stream() zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> { (metaReplicaZNodes, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;
} }
getMetaRegionLocation(future, metaReplicaZNodes); getMetaRegionLocation(future, metaReplicaZNodes);
}); });
return future; return future;
}, "ZKConnectionRegistry.getMetaRegionLocations");
} }
private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
@ -218,7 +222,8 @@ class ZKConnectionRegistry implements ConnectionRegistry {
@Override @Override
public CompletableFuture<ServerName> getActiveMaster() { public CompletableFuture<ServerName> getActiveMaster() {
return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) return tracedFuture(
() -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
.thenApply(proto -> { .thenApply(proto -> {
if (proto == null) { if (proto == null) {
return null; return null;
@ -226,7 +231,8 @@ class ZKConnectionRegistry implements ConnectionRegistry {
HBaseProtos.ServerName snProto = proto.getMaster(); HBaseProtos.ServerName snProto = proto.getMaster();
return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
snProto.getStartCode()); snProto.getStartCode());
}); }),
"ZKConnectionRegistry.getActiveMaster");
} }
@Override @Override