HBASE-23898 Add trace support for simple apis in async client (#2813)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2020-12-30 14:58:57 +08:00 committed by Duo Zhang
parent 57960fa8fa
commit 805b2ae2ad
16 changed files with 1169 additions and 346 deletions

View File

@ -144,6 +144,16 @@
<groupId>org.jruby.joni</groupId> <groupId>org.jruby.joni</groupId>
<artifactId>joni</artifactId> <artifactId>joni</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId> <artifactId>jcl-over-slf4j</artifactId>

View File

@ -64,8 +64,8 @@ public interface AsyncConnection extends Closeable {
/** /**
* Retrieve an {@link AsyncTable} implementation for accessing a table. * Retrieve an {@link AsyncTable} implementation for accessing a table.
* <p> * <p>
* The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if you
* you want to customize some configs. * want to customize some configs.
* <p> * <p>
* This method no longer checks table existence. An exception will be thrown if the table does not * This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted. * exist only when the first operation is attempted.

View File

@ -27,6 +27,8 @@ import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRI
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Optional; import java.util.Optional;
@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils; import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -121,7 +124,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile ConnectionOverAsyncConnection conn; private volatile ConnectionOverAsyncConnection conn;
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) { SocketAddress localAddress, User user) {
this.conf = conf; this.conf = conf;
this.user = user; this.user = user;
@ -135,8 +138,8 @@ class AsyncConnectionImpl implements AsyncConnection {
} else { } else {
this.metrics = Optional.empty(); this.metrics = Optional.empty();
} }
this.rpcClient = RpcClientFactory.createClient( this.rpcClient =
conf, clusterId, localAddress, metrics.orElse(null)); RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcTimeout = this.rpcTimeout =
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
@ -159,14 +162,13 @@ class AsyncConnectionImpl implements AsyncConnection {
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
} else { } else {
try { try {
listener = new ClusterStatusListener( listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
new ClusterStatusListener.DeadServerHandler() { @Override
@Override public void newDead(ServerName sn) {
public void newDead(ServerName sn) { locator.clearCache(sn);
locator.clearCache(sn); rpcClient.cancelConnections(sn);
rpcClient.cancelConnections(sn); }
} }, conf, listenerClass);
}, conf, listenerClass);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
} }
@ -206,28 +208,30 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override @Override
public void close() { public void close() {
if (!closed.compareAndSet(false, true)) { TraceUtil.trace(() -> {
return; if (!closed.compareAndSet(false, true)) {
} return;
LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
if(LOG.isDebugEnabled()){
logCallStack(Thread.currentThread().getStackTrace());
}
IOUtils.closeQuietly(clusterStatusListener,
e -> LOG.warn("failed to close clusterStatusListener", e));
IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
synchronized (this) {
if (choreService != null) {
choreService.shutdown();
choreService = null;
} }
} LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
metrics.ifPresent(MetricsConnection::shutdown); if (LOG.isDebugEnabled()) {
ConnectionOverAsyncConnection c = this.conn; logCallStack(Thread.currentThread().getStackTrace());
if (c != null) { }
c.closePool(); IOUtils.closeQuietly(clusterStatusListener,
} e -> LOG.warn("failed to close clusterStatusListener", e));
IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
synchronized (this) {
if (choreService != null) {
choreService.shutdown();
choreService = null;
}
}
metrics.ifPresent(MetricsConnection::shutdown);
ConnectionOverAsyncConnection c = this.conn;
if (c != null) {
c.closePool();
}
}, "AsyncConnection.close");
} }
private void logCallStack(StackTraceElement[] stackTraceElements) { private void logCallStack(StackTraceElement[] stackTraceElements) {
@ -341,7 +345,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override @Override
public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
ExecutorService pool) { ExecutorService pool) {
return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
@Override @Override
@ -382,7 +386,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override @Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
ExecutorService pool) { ExecutorService pool) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER); RETRY_TIMER);
} }
@ -406,28 +410,36 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override @Override
public CompletableFuture<Hbck> getHbck() { public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>(); return TraceUtil.tracedFuture(() -> {
addListener(registry.getActiveMaster(), (sn, error) -> { CompletableFuture<Hbck> future = new CompletableFuture<>();
if (error != null) { addListener(registry.getActiveMaster(), (sn, error) -> {
future.completeExceptionally(error); if (error != null) {
} else { future.completeExceptionally(error);
try { } else {
future.complete(getHbck(sn)); try {
} catch (IOException e) { future.complete(getHbck(sn));
future.completeExceptionally(e); } catch (IOException e) {
future.completeExceptionally(e);
}
} }
} });
}); return future;
return future; }, getClass().getName() + ".getHbck");
} }
@Override @Override
public Hbck getHbck(ServerName masterServer) throws IOException { public Hbck getHbck(ServerName masterServer) throws IOException {
// we will not create a new connection when creating a new protobuf stub, and for hbck there Span span = TraceUtil.createSpan(getClass().getName() + ".getHbck")
// will be no performance consideration, so for simplification we will create a new stub every .setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
// time instead of caching the stub here. try (Scope scope = span.makeCurrent()) {
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( // we will not create a new connection when creating a new protobuf stub, and for hbck there
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); // will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(
MasterProtos.HbckService
.newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)),
rpcControllerFactory);
}
} }
Optional<MetricsConnection> getConnectionMetrics() { Optional<MetricsConnection> getConnectionMetrics() {

View File

@ -18,16 +18,28 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.TraceUtil.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -61,7 +73,7 @@ class AsyncRegionLocator {
} }
private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs, private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
Supplier<String> timeoutMsg) { Supplier<String> timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) { if (future.isDone() || timeoutNs <= 0) {
return future; return future;
} }
@ -84,64 +96,101 @@ class AsyncRegionLocator {
return TableName.isMetaTableName(tableName); return TableName.isMetaTableName(tableName);
} }
private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
Span span = createTableSpan(getClass().getSimpleName() + "." + methodName, tableName);
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
if (error != null) {
span.recordException(error);
span.setStatus(StatusCode.ERROR);
} else {
List<String> regionNames = getRegionNames.apply(resp);
if (!regionNames.isEmpty()) {
span.setAttribute(REGION_NAMES_KEY, regionNames);
}
span.setStatus(StatusCode.OK);
}
span.end();
});
return future;
}
}
private List<String> getRegionName(RegionLocations locs) {
List<String> names = new ArrayList<>();
for (HRegionLocation loc : locs.getRegionLocations()) {
if (loc != null) {
names.add(loc.getRegion().getRegionNameAsString());
}
}
return names;
}
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) { RegionLocateType type, boolean reload, long timeoutNs) {
CompletableFuture<RegionLocations> future = isMeta(tableName) return tracedLocationFuture(() -> {
? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) CompletableFuture<RegionLocations> future = isMeta(tableName) ?
: nonMetaRegionLocator.getRegionLocations(tableName, row, metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); nonMetaRegionLocator.getRegionLocations(tableName, row,
return withTimeout(future, timeoutNs, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + return withTimeout(future, timeoutNs,
"ms) waiting for region locations for " + tableName + ", row='" + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
Bytes.toStringBinary(row) + "'"); "ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}, this::getRegionName, tableName, "getRegionLocations");
} }
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
// meta region can not be split right now so we always call the same method. return tracedLocationFuture(() -> {
// Change it later if the meta table can have more than one regions. // meta region can not be split right now so we always call the same method.
CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); // Change it later if the meta table can have more than one regions.
CompletableFuture<RegionLocations> locsFuture = CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) CompletableFuture<RegionLocations> locsFuture =
: nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) :
addListener(locsFuture, (locs, error) -> { nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
if (error != null) { addListener(locsFuture, (locs, error) -> {
future.completeExceptionally(error); if (error != null) {
return; future.completeExceptionally(error);
} return;
HRegionLocation loc = locs.getRegionLocation(replicaId); }
if (loc == null) { HRegionLocation loc = locs.getRegionLocation(replicaId);
future.completeExceptionally( if (loc == null) {
new RegionOfflineException("No location for " + tableName + ", row='" + future.completeExceptionally(
Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); new RegionOfflineException("No location for " + tableName + ", row='" +
} else if (loc.getServerName() == null) { Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
future.completeExceptionally( } else if (loc.getServerName() == null) {
new RegionOfflineException("No server address listed for region '" + future.completeExceptionally(
loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + new RegionOfflineException("No server address listed for region '" +
"', locateType=" + type + ", replicaId=" + replicaId)); loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
} else { "', locateType=" + type + ", replicaId=" + replicaId));
future.complete(loc); } else {
} future.complete(loc);
}); }
return withTimeout(future, timeoutNs, });
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + return withTimeout(future, timeoutNs,
"ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"', replicaId=" + replicaId); "ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
"getRegionLocation");
} }
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, long timeoutNs) { int replicaId, RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
} }
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) { RegionLocateType type, boolean reload, long timeoutNs) {
return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
timeoutNs); timeoutNs);
} }
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType type, long timeoutNs) { RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, type, false, timeoutNs); return getRegionLocation(tableName, row, type, false, timeoutNs);
} }
@ -154,24 +203,31 @@ class AsyncRegionLocator {
} }
void clearCache(TableName tableName) { void clearCache(TableName tableName) {
LOG.debug("Clear meta cache for {}", tableName); TraceUtil.trace(() -> {
if (tableName.equals(META_TABLE_NAME)) { LOG.debug("Clear meta cache for {}", tableName);
metaRegionLocator.clearCache(); if (tableName.equals(META_TABLE_NAME)) {
} else { metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache(tableName); } else {
} nonMetaRegionLocator.clearCache(tableName);
}
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
} }
void clearCache(ServerName serverName) { void clearCache(ServerName serverName) {
LOG.debug("Clear meta cache for {}", serverName); TraceUtil.trace(() -> {
metaRegionLocator.clearCache(serverName); LOG.debug("Clear meta cache for {}", serverName);
nonMetaRegionLocator.clearCache(serverName); metaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
serverName.getServerName()));
} }
void clearCache() { void clearCache() {
metaRegionLocator.clearCache(); TraceUtil.trace(() -> {
nonMetaRegionLocator.clearCache(); metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}, "AsyncRegionLocator.clearCache");
} }
AsyncNonMetaRegionLocator getNonMetaRegionLocator() { AsyncNonMetaRegionLocator getNonMetaRegionLocator() {

View File

@ -70,6 +70,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* Gets the {@link AsyncTableRegionLocator} for this table. * Gets the {@link AsyncTableRegionLocator} for this table.
*/ */
AsyncTableRegionLocator getRegionLocator(); AsyncTableRegionLocator getRegionLocator();
/** /**
* Get timeout of each rpc request in this Table instance. It will be overridden by a more * Get timeout of each rpc request in this Table instance. It will be overridden by a more
* specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
@ -184,7 +185,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* {@link CompletableFuture}. * {@link CompletableFuture}.
*/ */
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
long amount) { long amount) {
return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
} }
@ -204,12 +205,12 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* {@link CompletableFuture}. * {@link CompletableFuture}.
*/ */
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
long amount, Durability durability) { long amount, Durability durability) {
Preconditions.checkNotNull(row, "row is null"); Preconditions.checkNotNull(row, "row is null");
Preconditions.checkNotNull(family, "family is null"); Preconditions.checkNotNull(family, "family is null");
return increment( return increment(
new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
.thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
} }
/** /**
@ -233,16 +234,15 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* </pre> * </pre>
* *
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more. * any more.
*/ */
@Deprecated @Deprecated
CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
/** /**
* A helper class for sending checkAndMutate request. * A helper class for sending checkAndMutate request.
*
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more. * any more.
*/ */
@Deprecated @Deprecated
interface CheckAndMutateBuilder { interface CheckAndMutateBuilder {
@ -319,16 +319,15 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* </pre> * </pre>
* *
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more. * any more.
*/ */
@Deprecated @Deprecated
CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
/** /**
* A helper class for sending checkAndMutate request with a filter. * A helper class for sending checkAndMutate request with a filter.
*
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more. * any more.
*/ */
@Deprecated @Deprecated
interface CheckAndMutateWithFilterBuilder { interface CheckAndMutateWithFilterBuilder {
@ -361,9 +360,8 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
} }
/** /**
* checkAndMutate that atomically checks if a row matches the specified condition. If it does, * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it
* it performs the specified action. * performs the specified action.
*
* @param checkAndMutate The CheckAndMutate object. * @param checkAndMutate The CheckAndMutate object.
* @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
*/ */
@ -373,22 +371,19 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
* that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
* atomically (and thus, each may fail independently of others). * atomically (and thus, each may fail independently of others).
*
* @param checkAndMutates The list of CheckAndMutate. * @param checkAndMutates The list of CheckAndMutate.
* @return A list of {@link CompletableFuture}s that represent the result for each * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate.
* CheckAndMutate.
*/ */
List<CompletableFuture<CheckAndMutateResult>> checkAndMutate( List<CompletableFuture<CheckAndMutateResult>>
List<CheckAndMutate> checkAndMutates); checkAndMutate(List<CheckAndMutate> checkAndMutates);
/** /**
* A simple version of batch checkAndMutate. It will fail if there are any failures. * A simple version of batch checkAndMutate. It will fail if there are any failures.
*
* @param checkAndMutates The list of rows to apply. * @param checkAndMutates The list of rows to apply.
* @return A {@link CompletableFuture} that wrapper the result list. * @return A {@link CompletableFuture} that wrapper the result list.
*/ */
default CompletableFuture<List<CheckAndMutateResult>> checkAndMutateAll( default CompletableFuture<List<CheckAndMutateResult>>
List<CheckAndMutate> checkAndMutates) { checkAndMutateAll(List<CheckAndMutate> checkAndMutates) {
return allOf(checkAndMutate(checkAndMutates)); return allOf(checkAndMutate(checkAndMutates));
} }
@ -484,7 +479,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
*/ */
default List<CompletableFuture<Boolean>> exists(List<Get> gets) { default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
return get(toCheckExistenceOnly(gets)).stream() return get(toCheckExistenceOnly(gets)).stream()
.<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
} }
/** /**
@ -592,7 +587,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* @see ServiceCaller * @see ServiceCaller
*/ */
<S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, byte[] row); ServiceCaller<S, R> callable, byte[] row);
/** /**
* The callback when we want to execute a coprocessor call on a range of regions. * The callback when we want to execute a coprocessor call on a range of regions.
@ -731,5 +726,5 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* for more details. * for more details.
*/ */
<S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -47,19 +49,21 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override @Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
boolean reload) { boolean reload) {
return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT,
reload, -1L); reload, -1L);
} }
@Override @Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() { public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
if (TableName.isMetaTableName(tableName)) { return tracedFuture(() -> {
return conn.registry.getMetaRegionLocations() if (TableName.isMetaTableName(tableName)) {
.thenApply(locs -> Arrays.asList(locs.getRegionLocations())); return conn.registry.getMetaRegionLocations()
} .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
return ClientMetaTableAccessor }
.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName); return ClientMetaTableAccessor
.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName);
}, getClass().getSimpleName() + ".getAllRegionLocations");
} }
@Override @Override

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -277,31 +278,33 @@ public class ConnectionFactory {
* @return AsyncConnection object wrapped by CompletableFuture * @return AsyncConnection object wrapped by CompletableFuture
*/ */
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user) { final User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); return TraceUtil.tracedFuture(() -> {
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
addListener(registry.getClusterId(), (clusterId, error) -> { ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
if (error != null) { addListener(registry.getClusterId(), (clusterId, error) -> {
registry.close(); if (error != null) {
future.completeExceptionally(error); registry.close();
return; future.completeExceptionally(error);
} return;
if (clusterId == null) { }
registry.close(); if (clusterId == null) {
future.completeExceptionally(new IOException("clusterid came back null")); registry.close();
return; future.completeExceptionally(new IOException("clusterid came back null"));
} return;
Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, }
AsyncConnectionImpl.class, AsyncConnection.class); Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
try { AsyncConnectionImpl.class, AsyncConnection.class);
future.complete( try {
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils future.complete(
.newInstance(clazz, conf, registry, clusterId, null, user))); user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
} catch (Exception e) { .newInstance(clazz, conf, registry, clusterId, null, user)));
registry.close(); } catch (Exception e) {
future.completeExceptionally(e); registry.close();
} future.completeExceptionally(e);
}); }
return future; });
return future;
}, ConnectionFactory.class.getSimpleName() + ".createAsyncConnection");
} }
} }

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
@ -132,8 +134,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} }
this.maxAttempts = builder.maxAttempts; this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt; this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() :
: conn.connConf.getScannerCaching(); conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
} }
@ -158,23 +160,23 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} }
private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, REQ req, HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<MutateRequest, byte[], REQ> reqConvert, Converter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
return ConnectionUtils.call(controller, loc, stub, req, reqConvert, return ConnectionUtils.call(controller, loc, stub, req, reqConvert,
(s, c, r, done) -> s.mutate(c, r, done), respConverter); (s, c, r, done) -> s.mutate(c, r, done), respConverter);
} }
private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, REQ req, HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<MutateRequest, byte[], REQ> reqConvert) { Converter<MutateRequest, byte[], REQ> reqConvert) {
return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
return null; return null;
}); });
} }
private static Result toResult(HBaseRpcController controller, MutateResponse resp) private static Result toResult(HBaseRpcController controller, MutateResponse resp)
throws IOException { throws IOException {
if (!resp.hasResult()) { if (!resp.hasResult()) {
return null; return null;
} }
@ -187,9 +189,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} }
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
NoncedConverter<MutateRequest, byte[], REQ> reqConvert, NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
return mutate(controller, loc, stub, req, return mutate(controller, loc, stub, req,
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
} }
@ -202,8 +204,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
} }
private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller( private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
R row, long rpcTimeoutNs) { newCaller(R row, long rpcTimeoutNs) {
return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
} }
@ -218,50 +220,58 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public CompletableFuture<Result> get(Get get) { public CompletableFuture<Result> get(Get get) {
return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), return tracedFuture(
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()); RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
"AsyncTable.get", tableName);
} }
@Override @Override
public CompletableFuture<Void> put(Put put) { public CompletableFuture<Void> put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize()); validatePut(put, conn.connConf.getMaxKeyValueSize());
return this.<Void, Put> newCaller(put, writeRpcTimeoutNs) return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest)) put, RequestConverter::buildMutateRequest))
.call(); .call(), "AsyncTable.put", tableName);
} }
@Override @Override
public CompletableFuture<Void> delete(Delete delete) { public CompletableFuture<Void> delete(Delete delete) {
return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) return tracedFuture(
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, () -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
stub, delete, RequestConverter::buildMutateRequest)) .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
.call(); stub, delete, RequestConverter::buildMutateRequest))
.call(),
"AsyncTable.delete", tableName);
} }
@Override @Override
public CompletableFuture<Result> append(Append append) { public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append); checkHasFamilies(append);
long nonceGroup = conn.getNonceGenerator().getNonceGroup(); return tracedFuture(() -> {
long nonce = conn.getNonceGenerator().newNonce(); long nonceGroup = conn.getNonceGenerator().getNonceGroup();
return this.<Result, Append> newCaller(append, rpcTimeoutNs) long nonce = conn.getNonceGenerator().newNonce();
.action( return this.<Result, Append> newCaller(append, rpcTimeoutNs)
(controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller, .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce,
loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) controller, loc, stub, append, RequestConverter::buildMutateRequest,
.call(); RawAsyncTableImpl::toResult))
.call();
}, "AsyncTable.append", tableName);
} }
@Override @Override
public CompletableFuture<Result> increment(Increment increment) { public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment); checkHasFamilies(increment);
long nonceGroup = conn.getNonceGenerator().getNonceGroup(); return tracedFuture(() -> {
long nonce = conn.getNonceGenerator().newNonce(); long nonceGroup = conn.getNonceGenerator().getNonceGroup();
return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) long nonce = conn.getNonceGenerator().newNonce();
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
controller, loc, stub, increment, RequestConverter::buildMutateRequest, .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
RawAsyncTableImpl::toResult)) controller, loc, stub, increment, RequestConverter::buildMutateRequest,
.call(); RawAsyncTableImpl::toResult))
.call();
}, "AsyncTable.increment", tableName);
} }
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@ -319,39 +329,43 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) { public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize()); validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck(); preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) return tracedFuture(
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
stub, put, .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed())) (c, r) -> r.getProcessed()))
.call(); .call(),
"AsyncTable.CheckAndMutateBuilder.thenPut", tableName);
} }
@Override @Override
public CompletableFuture<Boolean> thenDelete(Delete delete) { public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck(); preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) return tracedFuture(
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
loc, stub, delete, .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed())) (c, r) -> r.getProcessed()))
.call(); .call(),
"AsyncTable.CheckAndMutateBuilder.thenDelete", tableName);
} }
@Override @Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck(); preCheck();
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), return tracedFuture(
rpcTimeoutNs) () -> RawAsyncTableImpl.this
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
loc, stub, mutation, .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, mutation,
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
CheckAndMutateResult::isSuccess)) null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
.call(); CheckAndMutateResult::isSuccess))
.call(),
"AsyncTable.CheckAndMutateBuilder.thenMutate", tableName);
} }
} }
@ -383,37 +397,42 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public CompletableFuture<Boolean> thenPut(Put put) { public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize()); validatePut(put, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed())) (c, r) -> r.getProcessed()))
.call(); .call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName);
} }
@Override @Override
public CompletableFuture<Boolean> thenDelete(Delete delete) { public CompletableFuture<Boolean> thenDelete(Delete delete) {
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) return tracedFuture(
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
loc, stub, delete, .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed())) (c, r) -> r.getProcessed()))
.call(); .call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName);
} }
@Override @Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), return tracedFuture(
rpcTimeoutNs) () -> RawAsyncTableImpl.this
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
loc, stub, mutation, .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, mutation,
filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
CheckAndMutateResult::isSuccess)) timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
.call(); CheckAndMutateResult::isSuccess))
.call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName);
} }
} }
@ -424,63 +443,69 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete return tracedFuture(() -> {
|| checkAndMutate.getAction() instanceof Increment if (checkAndMutate.getAction() instanceof Put ||
|| checkAndMutate.getAction() instanceof Append) { checkAndMutate.getAction() instanceof Delete ||
Mutation mutation = (Mutation) checkAndMutate.getAction(); checkAndMutate.getAction() instanceof Increment ||
if (mutation instanceof Put) { checkAndMutate.getAction() instanceof Append) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); Mutation mutation = (Mutation) checkAndMutate.getAction();
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
}
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this
.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(),
rpcTimeoutNs)
.action(
(controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this
.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this
.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(controller, loc, stub,
rowMutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
resp -> resp))
.call();
} else {
CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
future.completeExceptionally(new DoNotRetryIOException(
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
return future;
} }
long nonceGroup = conn.getNonceGenerator().getNonceGroup(); }, "AsyncTable.checkAndMutate", tableName);
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
mutation.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, mutation,
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(
controller, loc, stub, rowMutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
resp -> resp))
.call();
} else {
CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
future.completeExceptionally(new DoNotRetryIOException(
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
return future;
}
} }
@Override @Override
public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate( public List<CompletableFuture<CheckAndMutateResult>>
List<CheckAndMutate> checkAndMutates) { checkAndMutate(List<CheckAndMutate> checkAndMutates) {
return batch(checkAndMutates, rpcTimeoutNs).stream() return tracedFutures(
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()); () -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
"AsyncTable.checkAndMutateList", tableName);
} }
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method. // so here I write a new method as I do not want to change the abstraction of call method.
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
Converter<MultiRequest, byte[], RowMutations> reqConvert, Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) {
Function<RES, RESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>(); CompletableFuture<RESP> future = new CompletableFuture<>();
try { try {
byte[] regionName = loc.getRegion().getRegionName(); byte[] regionName = loc.getRegion().getRegionName();
@ -499,12 +524,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
loc.getServerName(), multiResp); loc.getServerName(), multiResp);
Throwable ex = multiResp.getException(regionName); Throwable ex = multiResp.getException(regionName);
if (ex != null) { if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex future.completeExceptionally(ex instanceof IOException ? ex :
: new IOException( new IOException(
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
} else { } else {
future.complete(respConverter future.complete(
.apply((RES) multiResp.getResults().get(regionName).result.get(0))); respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0)));
} }
} catch (IOException e) { } catch (IOException e) {
future.completeExceptionally(e); future.completeExceptionally(e);
@ -523,12 +548,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce(); long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), return tracedFuture(
writeRpcTimeoutNs).action((controller, loc, stub) -> () -> this
this.<Result, Result> mutateRow(controller, loc, stub, mutations, .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
(rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub,
mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
resp -> resp)) resp -> resp))
.call(); .call(),
"AsyncTable.mutateRow", tableName);
} }
private Scan setDefaultScanConfig(Scan scan) { private Scan setDefaultScanConfig(Scan scan) {
@ -564,46 +591,48 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public CompletableFuture<List<Result>> scanAll(Scan scan) { public CompletableFuture<List<Result>> scanAll(Scan scan) {
CompletableFuture<List<Result>> future = new CompletableFuture<>(); return tracedFuture(() -> {
List<Result> scanResults = new ArrayList<>(); CompletableFuture<List<Result>> future = new CompletableFuture<>();
scan(scan, new AdvancedScanResultConsumer() { List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
@Override @Override
public void onNext(Result[] results, ScanController controller) { public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results)); scanResults.addAll(Arrays.asList(results));
} }
@Override @Override
public void onError(Throwable error) { public void onError(Throwable error) {
future.completeExceptionally(error); future.completeExceptionally(error);
} }
@Override @Override
public void onComplete() { public void onComplete() {
future.complete(scanResults); future.complete(scanResults);
} }
}); });
return future; return future;
}, "AsyncTable.scanAll", tableName);
} }
@Override @Override
public List<CompletableFuture<Result>> get(List<Get> gets) { public List<CompletableFuture<Result>> get(List<Get> gets) {
return batch(gets, readRpcTimeoutNs); return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName);
} }
@Override @Override
public List<CompletableFuture<Void>> put(List<Put> puts) { public List<CompletableFuture<Void>> put(List<Put> puts) {
return voidMutate(puts); return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName);
} }
@Override @Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) { public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
return voidMutate(deletes); return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName);
} }
@Override @Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return batch(actions, rpcTimeoutNs); return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName);
} }
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
@ -660,7 +689,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} }
private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs); region, row, rpcTimeoutNs, operationTimeoutNs);
S stub = stubMaker.apply(channel); S stub = stubMaker.apply(channel);
@ -678,7 +707,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, byte[] row) { ServiceCaller<S, R> callable, byte[] row) {
return coprocessorService(stubMaker, callable, null, row); return coprocessorService(stubMaker, callable, null, row);
} }
@ -700,9 +729,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} }
private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
if (error != null) { if (error != null) {
callback.onError(error); callback.onError(error);
return; return;
@ -731,7 +760,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} }
private final class CoprocessorServiceBuilderImpl<S, R> private final class CoprocessorServiceBuilderImpl<S, R>
implements CoprocessorServiceBuilder<S, R> { implements CoprocessorServiceBuilder<S, R> {
private final Function<RpcChannel, S> stubMaker; private final Function<RpcChannel, S> stubMaker;
@ -748,7 +777,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private boolean endKeyInclusive; private boolean endKeyInclusive;
public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
this.callable = Preconditions.checkNotNull(callable, "callable is null"); this.callable = Preconditions.checkNotNull(callable, "callable is null");
this.callback = Preconditions.checkNotNull(callback, "callback is null"); this.callback = Preconditions.checkNotNull(callback, "callback is null");
@ -785,8 +814,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) { CoprocessorCallback<R> callback) {
return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
} }
} }

View File

@ -393,10 +393,11 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
} }
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final Message param, Message returnType, final User ticket, final Address addr,
final Address addr, final RpcCallback<Message> callback) { final RpcCallback<Message> callback) {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName()) Span span = TraceUtil.createSpan("RpcClient.callMethod")
.startSpan(); .setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName())
.setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName());
try (Scope scope = span.makeCurrent()) { try (Scope scope = span.makeCurrent()) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime()); cs.setStartTime(EnvironmentEdgeManager.currentTime());

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncRegionLocatorTracing {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncRegionLocatorTracing.class);
private static Configuration CONF = HBaseConfiguration.create();
private AsyncConnectionImpl conn;
private RegionLocations locs;
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
@Before
public void setUp() throws IOException {
RegionInfo metaRegionInfo = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).build();
locs = new RegionLocations(
new HRegionLocation(metaRegionInfo,
ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis())),
new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 1),
ServerName.valueOf("127.0.0.2", 12345, System.currentTimeMillis())),
new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 2),
ServerName.valueOf("127.0.0.3", 12345, System.currentTimeMillis())));
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF) {
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return CompletableFuture.completedFuture(locs);
}
}, "test", null, UserProvider.instantiate(CONF).getCurrent());
}
@After
public void tearDown() throws IOException {
Closeables.close(conn, true);
}
private SpanData waitSpan(String name) {
Waiter.waitFor(CONF, 1000,
() -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
}
@Test
public void testClearCache() {
conn.getLocator().clearCache();
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
}
@Test
public void testClearCacheServerName() {
ServerName sn = ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis());
conn.getLocator().clearCache(sn);
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
assertEquals(sn.toString(), span.getAttributes().get(TraceUtil.SERVER_NAME_KEY));
}
@Test
public void testClearCacheTableName() {
conn.getLocator().clearCache(TableName.META_TABLE_NAME);
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
span.getAttributes().get(TraceUtil.NAMESPACE_KEY));
assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
span.getAttributes().get(TraceUtil.TABLE_KEY));
}
@Test
public void testGetRegionLocation() {
conn.getLocator().getRegionLocation(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1)).join();
SpanData span = waitSpan("AsyncRegionLocator.getRegionLocation");
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
span.getAttributes().get(TraceUtil.NAMESPACE_KEY));
assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
span.getAttributes().get(TraceUtil.TABLE_KEY));
List<String> regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY);
assertEquals(1, regionNames.size());
assertEquals(locs.getDefaultRegionLocation().getRegion().getRegionNameAsString(),
regionNames.get(0));
}
@Test
public void testGetRegionLocations() {
conn.getLocator().getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
RegionLocateType.CURRENT, false, TimeUnit.SECONDS.toNanos(1)).join();
SpanData span = waitSpan("AsyncRegionLocator.getRegionLocations");
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
span.getAttributes().get(TraceUtil.NAMESPACE_KEY));
assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
span.getAttributes().get(TraceUtil.TABLE_KEY));
List<String> regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY);
assertEquals(3, regionNames.size());
for (int i = 0; i < 3; i++) {
assertEquals(locs.getRegionLocation(i).getRegion().getRegionNameAsString(),
regionNames.get(i));
}
}
}

View File

@ -0,0 +1,414 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import io.opentelemetry.api.trace.Span.Kind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncTableTracing {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
private static Configuration CONF = HBaseConfiguration.create();
private ClientService.Interface stub;
private AsyncConnection conn;
private AsyncTable<?> table;
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
@Before
public void setUp() throws IOException {
stub = mock(ClientService.Interface.class);
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true).build());
} else {
if (req.hasCloseScanner() && req.getCloseScanner()) {
done.run(ScanResponse.getDefaultInstance());
} else {
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
.setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.addResults(ProtobufUtil.toResult(result));
if (req.getLimitOfRows() == 1) {
builder.setMoreResultsInRegion(false).setMoreResults(false);
} else {
builder.setMoreResultsInRegion(true).setMoreResults(true);
}
ForkJoinPool.commonPool().execute(() -> done.run(builder.build()));
}
}
return null;
}
}).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ClientProtos.MultiResponse resp =
ClientProtos.MultiResponse.newBuilder()
.addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
.build();
RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
ForkJoinPool.commonPool().execute(() -> done.run(resp));
return null;
}
}).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
MutateResponse resp;
switch (req.getMutateType()) {
case INCREMENT:
ColumnValue value = req.getColumnValue(0);
QualifierValue qvalue = value.getQualifierValue(0);
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
.setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
.setQualifier(qvalue.getQualifier().toByteArray())
.setValue(qvalue.getValue().toByteArray()).build();
resp = MutateResponse.newBuilder()
.setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
break;
default:
resp = MutateResponse.getDefaultInstance();
break;
}
RpcCallback<MutateResponse> done = invocation.getArgument(2);
ForkJoinPool.commonPool().execute(() -> done.run(resp));
return null;
}
}).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RpcCallback<GetResponse> done = invocation.getArgument(2);
ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance()));
return null;
}
}).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null,
UserProvider.instantiate(CONF).getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
Answer<CompletableFuture<HRegionLocation>> answer =
new Answer<CompletableFuture<HRegionLocation>>() {
@Override
public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
throws Throwable {
TableName tableName = invocation.getArgument(0);
RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
HRegionLocation loc = new HRegionLocation(info, serverName);
return CompletableFuture.completedFuture(loc);
}
};
doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
any(RegionLocateType.class), anyLong());
doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
anyInt(), any(RegionLocateType.class), anyLong());
return locator;
}
@Override
ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
return stub;
}
};
table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool());
}
@After
public void tearDown() throws IOException {
Closeables.close(conn, true);
}
private void assertTrace(String methodName) {
Waiter.waitFor(CONF, 1000,
() -> traceRule.getSpans().stream()
.anyMatch(span -> span.getName().equals("AsyncTable." + methodName) &&
span.getKind() == Kind.INTERNAL && span.hasEnded()));
SpanData data = traceRule.getSpans().stream()
.filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get();
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
TableName tableName = table.getName();
assertEquals(tableName.getNamespaceAsString(),
data.getAttributes().get(TraceUtil.NAMESPACE_KEY));
assertEquals(tableName.getNameAsString(), data.getAttributes().get(TraceUtil.TABLE_KEY));
}
@Test
public void testExists() {
table.exists(new Get(Bytes.toBytes(0))).join();
assertTrace("get");
}
@Test
public void testGet() {
table.get(new Get(Bytes.toBytes(0))).join();
assertTrace("get");
}
@Test
public void testPut() {
table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v"))).join();
assertTrace("put");
}
@Test
public void testDelete() {
table.delete(new Delete(Bytes.toBytes(0))).join();
assertTrace("delete");
}
@Test
public void testAppend() {
table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v"))).join();
assertTrace("append");
}
@Test
public void testIncrement() {
table
.increment(
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
.join();
assertTrace("increment");
}
@Test
public void testIncrementColumnValue1() {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
.join();
assertTrace("increment");
}
@Test
public void testIncrementColumnValue2() {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
Durability.ASYNC_WAL).join();
assertTrace("increment");
}
@Test
public void testCheckAndMutate() {
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0)))).join();
assertTrace("checkAndMutate");
}
@Test
public void testCheckAndMutateList() {
CompletableFuture
.allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
assertTrace("checkAndMutateList");
}
@Test
public void testCheckAndMutateAll() {
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
assertTrace("checkAndMutateList");
}
@Test
public void testMutateRow() throws IOException {
table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
assertTrace("mutateRow");
}
@Test
public void testScanAll() throws IOException {
table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
assertTrace("scanAll");
}
@Test
public void testExistsList() {
CompletableFuture
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("getList");
}
@Test
public void testExistsAll() {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("getList");
}
@Test
public void testGetList() {
CompletableFuture
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("getList");
}
@Test
public void testGetAll() {
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("getList");
}
@Test
public void testPutList() {
CompletableFuture
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
assertTrace("putList");
}
@Test
public void testPutAll() {
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
assertTrace("putList");
}
@Test
public void testDeleteList() {
CompletableFuture
.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("deleteList");
}
@Test
public void testDeleteAll() {
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("deleteList");
}
@Test
public void testBatch() {
CompletableFuture
.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("batch");
}
@Test
public void testBatchAll() {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("batch");
}
@Test
public void testConnClose() throws IOException {
conn.close();
Waiter.waitFor(CONF, 1000,
() -> traceRule.getSpans().stream()
.anyMatch(span -> span.getName().equals("AsyncConnection.close") &&
span.getKind() == Kind.INTERNAL && span.hasEnded()));
SpanData data = traceRule.getSpans().stream()
.filter(s -> s.getName().equals("AsyncConnection.close")).findFirst().get();
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
}
}

View File

@ -18,7 +18,19 @@
package org.apache.hadoop.hbase.trace; package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Span.Kind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -26,10 +38,132 @@ public final class TraceUtil {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase"; private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase";
public static final AttributeKey<String> NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE;
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
AttributeKey.stringArrayKey("db.hbase.regions");
public static final AttributeKey<String> RPC_SERVICE_KEY =
AttributeKey.stringKey("db.hbase.rpc.service");
public static final AttributeKey<String> RPC_METHOD_KEY =
AttributeKey.stringKey("db.hbase.rpc.method");
public static final AttributeKey<String> SERVER_NAME_KEY =
AttributeKey.stringKey("db.hbase.server.name");
private TraceUtil() { private TraceUtil() {
} }
public static Tracer getGlobalTracer() { public static Tracer getGlobalTracer() {
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
} }
/**
* Create a {@link Kind#INTERNAL} span.
*/
public static Span createSpan(String name) {
return createSpan(name, Kind.INTERNAL);
}
/**
* Create a {@link Kind#INTERNAL} span and set table related attributes.
*/
public static Span createTableSpan(String spanName, TableName tableName) {
return createSpan(spanName).setAttribute(NAMESPACE_KEY, tableName.getNamespaceAsString())
.setAttribute(TABLE_KEY, tableName.getNameAsString());
}
/**
* Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one
* {@link Kind#CLIENT} span and one {@link Kind#SERVER} span for a traced request, so use this
* with caution when you want to create spans with kind other than {@link Kind#INTERNAL}.
*/
private static Span createSpan(String name, Kind kind) {
return getGlobalTracer().spanBuilder(name).setSpanKind(kind).startSpan();
}
/**
* Create a span which parent is from remote, i.e, passed through rpc.
* </p>
* We will set the kind of the returned span to {@link Kind#SERVER}, as this should be the top
* most span at server side.
*/
public static Span createRemoteSpan(String name, Context ctx) {
return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan();
}
/**
* Trace an asynchronous operation for a table.
*/
public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action,
String spanName, TableName tableName) {
Span span = createTableSpan(spanName, tableName);
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
endSpan(future, span);
return future;
}
}
/**
* Trace an asynchronous operation.
*/
public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action,
String spanName) {
Span span = createSpan(spanName);
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
endSpan(future, span);
return future;
}
}
/**
* Trace an asynchronous operation, and finish the create {@link Span} when all the given
* {@code futures} are completed.
*/
public static <T> List<CompletableFuture<T>> tracedFutures(
Supplier<List<CompletableFuture<T>>> action, String spanName, TableName tableName) {
Span span = createTableSpan(spanName, tableName);
try (Scope scope = span.makeCurrent()) {
List<CompletableFuture<T>> futures = action.get();
endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);
return futures;
}
}
/**
* Finish the {@code span} when the given {@code future} is completed.
*/
private static void endSpan(CompletableFuture<?> future, Span span) {
FutureUtils.addListener(future, (resp, error) -> {
if (error != null) {
span.recordException(error);
span.setStatus(StatusCode.ERROR);
} else {
span.setStatus(StatusCode.OK);
}
span.end();
});
}
public static void trace(Runnable action, String spanName) {
trace(action, () -> createSpan(spanName));
}
public static void trace(Runnable action, Supplier<Span> creator) {
Span span = creator.get();
try (Scope scope = span.makeCurrent()) {
action.run();
span.setStatus(StatusCode.OK);
} catch (Throwable e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR);
} finally {
span.end();
}
}
} }

View File

@ -121,9 +121,10 @@ public class CallRunner {
RpcServer.CurCall.set(call); RpcServer.CurCall.set(call);
String serviceName = getServiceName(); String serviceName = getServiceName();
String methodName = getMethodName(); String methodName = getMethodName();
String traceString = serviceName + "." + methodName; Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod")
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString) .setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan()
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan(); .setAttribute(TraceUtil.RPC_SERVICE_KEY, serviceName)
.setAttribute(TraceUtil.RPC_METHOD_KEY, methodName);
try (Scope traceScope = span.makeCurrent()) { try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) { if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress(); InetSocketAddress address = rpcServer.getListenerAddress();

View File

@ -629,8 +629,7 @@ abstract class ServerRpcConnection implements Closeable {
}; };
Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.current(), header.getTraceInfo(), getter); .extract(Context.current(), header.getTraceInfo(), getter);
Span span = Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan();
try (Scope scope = span.makeCurrent()) { try (Scope scope = span.makeCurrent()) {
int id = header.getCallId(); int id = header.getCallId();
if (RpcServer.LOG.isTraceEnabled()) { if (RpcServer.LOG.isTraceEnabled()) {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -448,6 +449,19 @@ public abstract class AbstractTestIPC {
} }
} }
private SpanData waitSpan(String name) {
Waiter.waitFor(CONF, 1000,
() -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
}
private void assertRpcAttribute(SpanData data, String methodName) {
assertEquals(SERVICE.getDescriptorForType().getName(),
data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY));
assertEquals(methodName,
data.getAttributes().get(TraceUtil.RPC_METHOD_KEY));
}
@Test @Test
public void testTracing() throws IOException, ServiceException { public void testTracing() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer", RpcServer rpcServer = createRpcServer(null, "testRpcServer",
@ -457,9 +471,8 @@ public abstract class AbstractTestIPC {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause");
.anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause"))); assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause");
assertSameTraceId(); assertSameTraceId();
for (SpanData data : traceRule.getSpans()) { for (SpanData data : traceRule.getSpans()) {
assertThat( assertThat(
@ -471,9 +484,8 @@ public abstract class AbstractTestIPC {
traceRule.clearSpans(); traceRule.clearSpans();
assertThrows(ServiceException.class, assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance())); () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error");
.anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error"))); assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error");
assertSameTraceId(); assertSameTraceId();
for (SpanData data : traceRule.getSpans()) { for (SpanData data : traceRule.getSpans()) {
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode()); assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());

View File

@ -1718,12 +1718,8 @@
<junit.version>4.13</junit.version> <junit.version>4.13</junit.version>
<hamcrest.version>1.3</hamcrest.version> <hamcrest.version>1.3</hamcrest.version>
<opentelemetry.version>0.13.1</opentelemetry.version> <opentelemetry.version>0.13.1</opentelemetry.version>
<<<<<<< HEAD
<log4j2.version>2.14.1</log4j2.version>
=======
<opentelemetry-instrumentation.version>0.13.0</opentelemetry-instrumentation.version> <opentelemetry-instrumentation.version>0.13.0</opentelemetry-instrumentation.version>
<log4j.version>1.2.17</log4j.version> <log4j2.version>2.14.1</log4j2.version>
>>>>>>> HBASE-25424 Find a way to config OpenTelemetry tracing without direct… (#2808)
<mockito-core.version>2.28.2</mockito-core.version> <mockito-core.version>2.28.2</mockito-core.version>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version> <protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<thrift.path>thrift</thrift.path> <thrift.path>thrift</thrift.path>
@ -2415,7 +2411,7 @@
<dependency> <dependency>
<groupId>io.opentelemetry.javaagent</groupId> <groupId>io.opentelemetry.javaagent</groupId>
<artifactId>opentelemetry-javaagent</artifactId> <artifactId>opentelemetry-javaagent</artifactId>
<version>${opentelemetry-instrumentation.version}</version> <version>${opentelemetry.version}</version>
<classifier>all</classifier> <classifier>all</classifier>
</dependency> </dependency>
<dependency> <dependency>