org.slf4j
jcl-over-slf4j
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 75971ad610c..d04b5f2cebe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -64,8 +64,8 @@ public interface AsyncConnection extends Closeable {
/**
* Retrieve an {@link AsyncTable} implementation for accessing a table.
*
- * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if
- * you want to customize some configs.
+ * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if you
+ * want to customize some configs.
*
* This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 2ed73992d7f..b919ee1892b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -27,6 +27,8 @@ import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRI
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
@@ -153,14 +156,13 @@ class AsyncConnectionImpl implements AsyncConnection {
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
} else {
try {
- listener = new ClusterStatusListener(
- new ClusterStatusListener.DeadServerHandler() {
- @Override
- public void newDead(ServerName sn) {
- locator.clearCache(sn);
- rpcClient.cancelConnections(sn);
- }
- }, conf, listenerClass);
+ listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
+ @Override
+ public void newDead(ServerName sn) {
+ locator.clearCache(sn);
+ rpcClient.cancelConnections(sn);
+ }
+ }, conf, listenerClass);
} catch (IOException e) {
LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
}
@@ -195,27 +197,29 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public void close() {
- // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
- // simple volatile flag.
- if (closed) {
- return;
- }
- LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
- if(LOG.isDebugEnabled()){
- logCallStack(Thread.currentThread().getStackTrace());
- }
- IOUtils.closeQuietly(clusterStatusListener,
- e -> LOG.warn("failed to close clusterStatusListener", e));
- IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
- IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
- synchronized (this) {
- if (choreService != null) {
- choreService.shutdown();
- choreService = null;
+ TraceUtil.trace(() -> {
+ // As the code below is safe to be executed in parallel, here we do not use CAS or lock,
+ // just a simple volatile flag.
+ if (closed) {
+ return;
}
- }
- metrics.ifPresent(MetricsConnection::shutdown);
- closed = true;
+ LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
+ if (LOG.isDebugEnabled()) {
+ logCallStack(Thread.currentThread().getStackTrace());
+ }
+ IOUtils.closeQuietly(clusterStatusListener,
+ e -> LOG.warn("failed to close clusterStatusListener", e));
+ IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
+ IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
+ synchronized (this) {
+ if (choreService != null) {
+ choreService.shutdown();
+ choreService = null;
+ }
+ }
+ metrics.ifPresent(MetricsConnection::shutdown);
+ closed = true;
+ }, "AsyncConnection.close");
}
private void logCallStack(StackTraceElement[] stackTraceElements) {
@@ -320,7 +324,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncTableBuilder getTableBuilder(TableName tableName,
- ExecutorService pool) {
+ ExecutorService pool) {
return new AsyncTableBuilderBase(tableName, connConf) {
@Override
@@ -361,35 +365,43 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
- ExecutorService pool) {
+ ExecutorService pool) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}
@Override
public CompletableFuture getHbck() {
- CompletableFuture future = new CompletableFuture<>();
- addListener(registry.getActiveMaster(), (sn, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- } else {
- try {
- future.complete(getHbck(sn));
- } catch (IOException e) {
- future.completeExceptionally(e);
+ return TraceUtil.tracedFuture(() -> {
+ CompletableFuture future = new CompletableFuture<>();
+ addListener(registry.getActiveMaster(), (sn, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else {
+ try {
+ future.complete(getHbck(sn));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
}
- }
- });
- return future;
+ });
+ return future;
+ }, getClass().getName() + ".getHbck");
}
@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
- // we will not create a new connection when creating a new protobuf stub, and for hbck there
- // will be no performance consideration, so for simplification we will create a new stub every
- // time instead of caching the stub here.
- return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
- rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
+ Span span = TraceUtil.createSpan(getClass().getName() + ".getHbck")
+ .setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
+ try (Scope scope = span.makeCurrent()) {
+ // we will not create a new connection when creating a new protobuf stub, and for hbck there
+ // will be no performance consideration, so for simplification we will create a new stub every
+ // time instead of caching the stub here.
+ return new HBaseHbck(
+ MasterProtos.HbckService
+ .newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)),
+ rpcControllerFactory);
+ }
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index d50070ae8a9..1d0efcc01df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -18,16 +18,28 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.trace.TraceUtil.REGION_NAMES_KEY;
+import static org.apache.hadoop.hbase.trace.TraceUtil.SERVER_NAME_KEY;
+import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
+import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Scope;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
@@ -60,7 +72,7 @@ class AsyncRegionLocator {
}
private CompletableFuture withTimeout(CompletableFuture future, long timeoutNs,
- Supplier timeoutMsg) {
+ Supplier timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) {
return future;
}
@@ -83,64 +95,101 @@ class AsyncRegionLocator {
return TableName.isMetaTableName(tableName);
}
+ private CompletableFuture tracedLocationFuture(Supplier> action,
+ Function> getRegionNames, TableName tableName, String methodName) {
+ Span span = createTableSpan(getClass().getSimpleName() + "." + methodName, tableName);
+ try (Scope scope = span.makeCurrent()) {
+ CompletableFuture future = action.get();
+ FutureUtils.addListener(future, (resp, error) -> {
+ if (error != null) {
+ span.recordException(error);
+ span.setStatus(StatusCode.ERROR);
+ } else {
+ List regionNames = getRegionNames.apply(resp);
+ if (!regionNames.isEmpty()) {
+ span.setAttribute(REGION_NAMES_KEY, regionNames);
+ }
+ span.setStatus(StatusCode.OK);
+ }
+ span.end();
+ });
+ return future;
+ }
+ }
+
+ private List getRegionName(RegionLocations locs) {
+ List names = new ArrayList<>();
+ for (HRegionLocation loc : locs.getRegionLocations()) {
+ if (loc != null) {
+ names.add(loc.getRegion().getRegionNameAsString());
+ }
+ }
+ return names;
+ }
+
CompletableFuture getRegionLocations(TableName tableName, byte[] row,
- RegionLocateType type, boolean reload, long timeoutNs) {
- CompletableFuture future = isMeta(tableName)
- ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
- : nonMetaRegionLocator.getRegionLocations(tableName, row,
- RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
- return withTimeout(future, timeoutNs,
- () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
- "ms) waiting for region locations for " + tableName + ", row='" +
- Bytes.toStringBinary(row) + "'");
+ RegionLocateType type, boolean reload, long timeoutNs) {
+ return tracedLocationFuture(() -> {
+ CompletableFuture future = isMeta(tableName) ?
+ metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
+ nonMetaRegionLocator.getRegionLocations(tableName, row,
+ RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+ return withTimeout(future, timeoutNs,
+ () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
+ "ms) waiting for region locations for " + tableName + ", row='" +
+ Bytes.toStringBinary(row) + "'");
+ }, this::getRegionName, tableName, "getRegionLocations");
}
CompletableFuture getRegionLocation(TableName tableName, byte[] row,
- int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
- // meta region can not be split right now so we always call the same method.
- // Change it later if the meta table can have more than one regions.
- CompletableFuture future = new CompletableFuture<>();
- CompletableFuture locsFuture =
- isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
- : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
- addListener(locsFuture, (locs, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- HRegionLocation loc = locs.getRegionLocation(replicaId);
- if (loc == null) {
- future.completeExceptionally(
- new RegionOfflineException("No location for " + tableName + ", row='" +
- Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
- } else if (loc.getServerName() == null) {
- future.completeExceptionally(
- new RegionOfflineException("No server address listed for region '" +
- loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
- "', locateType=" + type + ", replicaId=" + replicaId));
- } else {
- future.complete(loc);
- }
- });
- return withTimeout(future, timeoutNs,
- () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
- "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) +
- "', replicaId=" + replicaId);
+ int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
+ return tracedLocationFuture(() -> {
+ // meta region can not be split right now so we always call the same method.
+ // Change it later if the meta table can have more than one regions.
+ CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture locsFuture =
+ isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) :
+ nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+ addListener(locsFuture, (locs, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ future.completeExceptionally(
+ new RegionOfflineException("No location for " + tableName + ", row='" +
+ Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
+ } else if (loc.getServerName() == null) {
+ future.completeExceptionally(
+ new RegionOfflineException("No server address listed for region '" +
+ loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
+ "', locateType=" + type + ", replicaId=" + replicaId));
+ } else {
+ future.complete(loc);
+ }
+ });
+ return withTimeout(future, timeoutNs,
+ () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
+ "ms) waiting for region location for " + tableName + ", row='" +
+ Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
+ }, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
+ "getRegionLocation");
}
CompletableFuture getRegionLocation(TableName tableName, byte[] row,
- int replicaId, RegionLocateType type, long timeoutNs) {
+ int replicaId, RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
}
CompletableFuture getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType type, boolean reload, long timeoutNs) {
+ RegionLocateType type, boolean reload, long timeoutNs) {
return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
timeoutNs);
}
CompletableFuture getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType type, long timeoutNs) {
+ RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, type, false, timeoutNs);
}
@@ -153,24 +202,31 @@ class AsyncRegionLocator {
}
void clearCache(TableName tableName) {
- LOG.debug("Clear meta cache for {}", tableName);
- if (tableName.equals(META_TABLE_NAME)) {
- metaRegionLocator.clearCache();
- } else {
- nonMetaRegionLocator.clearCache(tableName);
- }
+ TraceUtil.trace(() -> {
+ LOG.debug("Clear meta cache for {}", tableName);
+ if (tableName.equals(META_TABLE_NAME)) {
+ metaRegionLocator.clearCache();
+ } else {
+ nonMetaRegionLocator.clearCache(tableName);
+ }
+ }, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
}
void clearCache(ServerName serverName) {
- LOG.debug("Clear meta cache for {}", serverName);
- metaRegionLocator.clearCache(serverName);
- nonMetaRegionLocator.clearCache(serverName);
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
+ TraceUtil.trace(() -> {
+ LOG.debug("Clear meta cache for {}", serverName);
+ metaRegionLocator.clearCache(serverName);
+ nonMetaRegionLocator.clearCache(serverName);
+ conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
+ }, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
+ serverName.getServerName()));
}
void clearCache() {
- metaRegionLocator.clearCache();
- nonMetaRegionLocator.clearCache();
+ TraceUtil.trace(() -> {
+ metaRegionLocator.clearCache();
+ nonMetaRegionLocator.clearCache();
+ }, "AsyncRegionLocator.clearCache");
}
AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 7473ed0ad91..c7003e05237 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -70,6 +70,7 @@ public interface AsyncTable {
* Gets the {@link AsyncTableRegionLocator} for this table.
*/
AsyncTableRegionLocator getRegionLocator();
+
/**
* Get timeout of each rpc request in this Table instance. It will be overridden by a more
* specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
@@ -184,7 +185,7 @@ public interface AsyncTable {
* {@link CompletableFuture}.
*/
default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
- long amount) {
+ long amount) {
return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
}
@@ -204,12 +205,12 @@ public interface AsyncTable {
* {@link CompletableFuture}.
*/
default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
- long amount, Durability durability) {
+ long amount, Durability durability) {
Preconditions.checkNotNull(row, "row is null");
Preconditions.checkNotNull(family, "family is null");
return increment(
new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
- .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
+ .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
}
/**
@@ -361,9 +362,8 @@ public interface AsyncTable {
}
/**
- * checkAndMutate that atomically checks if a row matches the specified condition. If it does,
- * it performs the specified action.
- *
+ * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it
+ * performs the specified action.
* @param checkAndMutate The CheckAndMutate object.
* @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
*/
@@ -373,22 +373,19 @@ public interface AsyncTable {
* Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
* that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
* atomically (and thus, each may fail independently of others).
- *
* @param checkAndMutates The list of CheckAndMutate.
- * @return A list of {@link CompletableFuture}s that represent the result for each
- * CheckAndMutate.
+ * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate.
*/
- List> checkAndMutate(
- List checkAndMutates);
+ List>
+ checkAndMutate(List checkAndMutates);
/**
* A simple version of batch checkAndMutate. It will fail if there are any failures.
- *
* @param checkAndMutates The list of rows to apply.
* @return A {@link CompletableFuture} that wrapper the result list.
*/
- default CompletableFuture> checkAndMutateAll(
- List checkAndMutates) {
+ default CompletableFuture>
+ checkAndMutateAll(List checkAndMutates) {
return allOf(checkAndMutate(checkAndMutates));
}
@@ -484,7 +481,7 @@ public interface AsyncTable {
*/
default List> exists(List gets) {
return get(toCheckExistenceOnly(gets)).stream()
- .> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
+ .> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
}
/**
@@ -592,7 +589,7 @@ public interface AsyncTable {
* @see ServiceCaller
*/
CompletableFuture coprocessorService(Function stubMaker,
- ServiceCaller callable, byte[] row);
+ ServiceCaller callable, byte[] row);
/**
* The callback when we want to execute a coprocessor call on a range of regions.
@@ -731,5 +728,5 @@ public interface AsyncTable {
* for more details.
*/
CoprocessorServiceBuilder coprocessorService(Function stubMaker,
- ServiceCaller callable, CoprocessorCallback callback);
+ ServiceCaller callable, CoprocessorCallback callback);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index fa3ea1ca4df..d5b275d2a77 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
+
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -47,19 +49,21 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override
public CompletableFuture getRegionLocation(byte[] row, int replicaId,
- boolean reload) {
+ boolean reload) {
return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT,
reload, -1L);
}
@Override
public CompletableFuture> getAllRegionLocations() {
- if (TableName.isMetaTableName(tableName)) {
- return conn.registry.getMetaRegionLocations()
- .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
- }
- return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
- tableName);
+ return tracedFuture(() -> {
+ if (TableName.isMetaTableName(tableName)) {
+ return conn.registry.getMetaRegionLocations()
+ .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
+ }
+ return AsyncMetaTableAccessor
+ .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName);
+ }, getClass().getSimpleName() + ".getAllRegionLocations");
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index f91b2107c6b..627e8d26a14 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -280,30 +281,32 @@ public class ConnectionFactory {
*/
public static CompletableFuture createAsyncConnection(Configuration conf,
final User user) {
- CompletableFuture future = new CompletableFuture<>();
- ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
- addListener(registry.getClusterId(), (clusterId, error) -> {
- if (error != null) {
- registry.close();
- future.completeExceptionally(error);
- return;
- }
- if (clusterId == null) {
- registry.close();
- future.completeExceptionally(new IOException("clusterid came back null"));
- return;
- }
- Class extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
- AsyncConnectionImpl.class, AsyncConnection.class);
- try {
- future.complete(
- user.runAs((PrivilegedExceptionAction extends AsyncConnection>) () -> ReflectionUtils
- .newInstance(clazz, conf, registry, clusterId, user)));
- } catch (Exception e) {
- registry.close();
- future.completeExceptionally(e);
- }
- });
- return future;
+ return TraceUtil.tracedFuture(() -> {
+ CompletableFuture future = new CompletableFuture<>();
+ ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
+ addListener(registry.getClusterId(), (clusterId, error) -> {
+ if (error != null) {
+ registry.close();
+ future.completeExceptionally(error);
+ return;
+ }
+ if (clusterId == null) {
+ registry.close();
+ future.completeExceptionally(new IOException("clusterid came back null"));
+ return;
+ }
+ Class extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
+ AsyncConnectionImpl.class, AsyncConnection.class);
+ try {
+ future.complete(user.runAs(
+ (PrivilegedExceptionAction extends AsyncConnection>) () -> ReflectionUtils
+ .newInstance(clazz, conf, registry, clusterId, user)));
+ } catch (Exception e) {
+ registry.close();
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
+ }, ConnectionFactory.class.getSimpleName() + ".createAsyncConnection");
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index bed896eaa45..f637e47f60a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
+import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
+import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
@@ -131,8 +133,8 @@ class RawAsyncTableImpl implements AsyncTable {
}
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
- this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
- : conn.connConf.getScannerCaching();
+ this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() :
+ conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
}
@@ -204,15 +206,15 @@ class RawAsyncTableImpl implements AsyncTable {
}
private static CompletableFuture voidMutate(HBaseRpcController controller,
- HRegionLocation loc, ClientService.Interface stub, REQ req,
- Converter reqConvert) {
+ HRegionLocation loc, ClientService.Interface stub, REQ req,
+ Converter reqConvert) {
return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
return null;
});
}
private static Result toResult(HBaseRpcController controller, MutateResponse resp)
- throws IOException {
+ throws IOException {
if (!resp.hasResult()) {
return null;
}
@@ -225,9 +227,9 @@ class RawAsyncTableImpl implements AsyncTable {
}
private CompletableFuture noncedMutate(long nonceGroup, long nonce,
- HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
- NoncedConverter reqConvert,
- Converter respConverter) {
+ HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
+ NoncedConverter reqConvert,
+ Converter respConverter) {
return mutate(controller, loc, stub, req,
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}
@@ -240,8 +242,8 @@ class RawAsyncTableImpl implements AsyncTable {
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
- private SingleRequestCallerBuilder newCaller(
- R row, long rpcTimeoutNs) {
+ private SingleRequestCallerBuilder
+ newCaller(R row, long rpcTimeoutNs) {
return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
}
@@ -256,50 +258,58 @@ class RawAsyncTableImpl implements AsyncTable {
@Override
public CompletableFuture get(Get get) {
- return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
- RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
- conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics());
+ return tracedFuture(
+ () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
+ RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
+ conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
+ "AsyncTable.get", tableName);
}
@Override
public CompletableFuture put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
- return this. newCaller(put, writeRpcTimeoutNs)
+ return tracedFuture(() -> this. newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
- .call();
+ .call(), "AsyncTable.put", tableName);
}
@Override
public CompletableFuture delete(Delete delete) {
- return this. newCaller(delete, writeRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc,
- stub, delete, RequestConverter::buildMutateRequest))
- .call();
+ return tracedFuture(
+ () -> this. newCaller(delete, writeRpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc,
+ stub, delete, RequestConverter::buildMutateRequest))
+ .call(),
+ "AsyncTable.delete", tableName);
}
@Override
public CompletableFuture append(Append append) {
checkHasFamilies(append);
- long nonceGroup = conn.getNonceGenerator().getNonceGroup();
- long nonce = conn.getNonceGenerator().newNonce();
- return this. newCaller(append, rpcTimeoutNs)
- .action(
- (controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, controller,
- loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
- .call();
+ return tracedFuture(() -> {
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
+ return this. newCaller(append, rpcTimeoutNs)
+ .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce,
+ controller, loc, stub, append, RequestConverter::buildMutateRequest,
+ RawAsyncTableImpl::toResult))
+ .call();
+ }, "AsyncTable.append", tableName);
}
@Override
public CompletableFuture increment(Increment increment) {
checkHasFamilies(increment);
- long nonceGroup = conn.getNonceGenerator().getNonceGroup();
- long nonce = conn.getNonceGenerator().newNonce();
- return this. newCaller(increment, rpcTimeoutNs)
- .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce,
- controller, loc, stub, increment, RequestConverter::buildMutateRequest,
- RawAsyncTableImpl::toResult))
- .call();
+ return tracedFuture(() -> {
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
+ return this. newCaller(increment, rpcTimeoutNs)
+ .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce,
+ controller, loc, stub, increment, RequestConverter::buildMutateRequest,
+ RawAsyncTableImpl::toResult))
+ .call();
+ }, "AsyncTable.increment", tableName);
}
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@@ -357,39 +367,43 @@ class RawAsyncTableImpl implements AsyncTable {
public CompletableFuture thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
- return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
- stub, put,
- (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
- null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
- (c, r) -> r.getProcessed()))
- .call();
+ return tracedFuture(
+ () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
+ (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+ null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ (c, r) -> r.getProcessed()))
+ .call(),
+ "AsyncTable.CheckAndMutateBuilder.thenPut", tableName);
}
@Override
public CompletableFuture thenDelete(Delete delete) {
preCheck();
- return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
- loc, stub, delete,
- (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
- null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
- (c, r) -> r.getProcessed()))
- .call();
+ return tracedFuture(
+ () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
+ (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+ null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ (c, r) -> r.getProcessed()))
+ .call(),
+ "AsyncTable.CheckAndMutateBuilder.thenDelete", tableName);
}
@Override
public CompletableFuture thenMutate(RowMutations mutation) {
preCheck();
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
- return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(),
- rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
- loc, stub, mutation,
- (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
- null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
- CheckAndMutateResult::isSuccess))
- .call();
+ return tracedFuture(
+ () -> RawAsyncTableImpl.this
+ . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
+ mutation,
+ (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
+ null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ CheckAndMutateResult::isSuccess))
+ .call(),
+ "AsyncTable.CheckAndMutateBuilder.thenMutate", tableName);
}
}
@@ -421,37 +435,42 @@ class RawAsyncTableImpl implements AsyncTable {
@Override
public CompletableFuture thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
- return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs)
+ return tracedFuture(
+ () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
- .call();
+ .call(),
+ "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName);
}
@Override
public CompletableFuture thenDelete(Delete delete) {
- return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
- loc, stub, delete,
- (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
- filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
- (c, r) -> r.getProcessed()))
- .call();
+ return tracedFuture(
+ () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
+ (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
+ timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ (c, r) -> r.getProcessed()))
+ .call(),
+ "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName);
}
@Override
public CompletableFuture thenMutate(RowMutations mutation) {
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
- return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(),
- rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
- loc, stub, mutation,
- (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null,
- filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
- CheckAndMutateResult::isSuccess))
- .call();
+ return tracedFuture(
+ () -> RawAsyncTableImpl.this
+ . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
+ mutation,
+ (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
+ timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ CheckAndMutateResult::isSuccess))
+ .call(),
+ "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName);
}
}
@@ -462,63 +481,69 @@ class RawAsyncTableImpl implements AsyncTable {
@Override
public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) {
- if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
- || checkAndMutate.getAction() instanceof Increment
- || checkAndMutate.getAction() instanceof Append) {
- Mutation mutation = (Mutation) checkAndMutate.getAction();
- if (mutation instanceof Put) {
- validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
+ return tracedFuture(() -> {
+ if (checkAndMutate.getAction() instanceof Put ||
+ checkAndMutate.getAction() instanceof Delete ||
+ checkAndMutate.getAction() instanceof Increment ||
+ checkAndMutate.getAction() instanceof Append) {
+ Mutation mutation = (Mutation) checkAndMutate.getAction();
+ if (mutation instanceof Put) {
+ validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
+ }
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
+ return RawAsyncTableImpl.this
+ . newCaller(checkAndMutate.getRow(), mutation.getPriority(),
+ rpcTimeoutNs)
+ .action(
+ (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
+ (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
+ checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
+ (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
+ .call();
+ } else if (checkAndMutate.getAction() instanceof RowMutations) {
+ RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
+ validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
+ return RawAsyncTableImpl.this
+ . newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(),
+ rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.this
+ . mutateRow(controller, loc, stub,
+ rowMutations,
+ (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
+ checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
+ resp -> resp))
+ .call();
+ } else {
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(new DoNotRetryIOException(
+ "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
+ return future;
}
- long nonceGroup = conn.getNonceGenerator().getNonceGroup();
- long nonce = conn.getNonceGenerator().newNonce();
- return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(),
- mutation.getPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
- loc, stub, mutation,
- (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
- checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
- checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
- checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
- (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
- .call();
- } else if (checkAndMutate.getAction() instanceof RowMutations) {
- RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
- validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
- long nonceGroup = conn.getNonceGenerator().getNonceGroup();
- long nonce = conn.getNonceGenerator().newNonce();
- return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(),
- rowMutations.getMaxPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) ->
- RawAsyncTableImpl.this. mutateRow(
- controller, loc, stub, rowMutations,
- (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
- checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
- checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
- checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
- resp -> resp))
- .call();
- } else {
- CompletableFuture future = new CompletableFuture<>();
- future.completeExceptionally(new DoNotRetryIOException(
- "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
- return future;
- }
+ }, "AsyncTable.checkAndMutate", tableName);
}
@Override
- public List> checkAndMutate(
- List checkAndMutates) {
- return batch(checkAndMutates, rpcTimeoutNs).stream()
- .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList());
+ public List>
+ checkAndMutate(List checkAndMutates) {
+ return tracedFutures(
+ () -> batch(checkAndMutates, rpcTimeoutNs).stream()
+ .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
+ "AsyncTable.checkAndMutateList", tableName);
}
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
@SuppressWarnings("unchecked")
private CompletableFuture mutateRow(HBaseRpcController controller,
- HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
- Converter reqConvert,
- Function respConverter) {
+ HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
+ Converter reqConvert, Function respConverter) {
CompletableFuture future = new CompletableFuture<>();
try {
byte[] regionName = loc.getRegion().getRegionName();
@@ -537,12 +562,12 @@ class RawAsyncTableImpl implements AsyncTable {
loc.getServerName(), multiResp);
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
- future.completeExceptionally(ex instanceof IOException ? ex
- : new IOException(
+ future.completeExceptionally(ex instanceof IOException ? ex :
+ new IOException(
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
} else {
- future.complete(respConverter
- .apply((RES) multiResp.getResults().get(regionName).result.get(0)));
+ future.complete(
+ respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0)));
}
} catch (IOException e) {
future.completeExceptionally(e);
@@ -561,12 +586,14 @@ class RawAsyncTableImpl implements AsyncTable {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
- return this. newCaller(mutations.getRow(), mutations.getMaxPriority(),
- writeRpcTimeoutNs).action((controller, loc, stub) ->
- this. mutateRow(controller, loc, stub, mutations,
- (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
+ return tracedFuture(
+ () -> this
+ . newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
+ .action((controller, loc, stub) -> this. mutateRow(controller, loc, stub,
+ mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
resp -> resp))
- .call();
+ .call(),
+ "AsyncTable.mutateRow", tableName);
}
private Scan setDefaultScanConfig(Scan scan) {
@@ -602,46 +629,48 @@ class RawAsyncTableImpl implements AsyncTable {
@Override
public CompletableFuture> scanAll(Scan scan) {
- CompletableFuture> future = new CompletableFuture<>();
- List scanResults = new ArrayList<>();
- scan(scan, new AdvancedScanResultConsumer() {
+ return tracedFuture(() -> {
+ CompletableFuture> future = new CompletableFuture<>();
+ List scanResults = new ArrayList<>();
+ scan(scan, new AdvancedScanResultConsumer() {
- @Override
- public void onNext(Result[] results, ScanController controller) {
- scanResults.addAll(Arrays.asList(results));
- }
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ scanResults.addAll(Arrays.asList(results));
+ }
- @Override
- public void onError(Throwable error) {
- future.completeExceptionally(error);
- }
+ @Override
+ public void onError(Throwable error) {
+ future.completeExceptionally(error);
+ }
- @Override
- public void onComplete() {
- future.complete(scanResults);
- }
- });
- return future;
+ @Override
+ public void onComplete() {
+ future.complete(scanResults);
+ }
+ });
+ return future;
+ }, "AsyncTable.scanAll", tableName);
}
@Override
public List> get(List gets) {
- return batch(gets, readRpcTimeoutNs);
+ return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName);
}
@Override
public List> put(List puts) {
- return voidMutate(puts);
+ return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName);
}
@Override
public List> delete(List deletes) {
- return voidMutate(deletes);
+ return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName);
}
@Override
public List> batch(List extends Row> actions) {
- return batch(actions, rpcTimeoutNs);
+ return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName);
}
private List> voidMutate(List extends Row> actions) {
@@ -698,7 +727,7 @@ class RawAsyncTableImpl implements AsyncTable {
}
private CompletableFuture coprocessorService(Function stubMaker,
- ServiceCaller callable, RegionInfo region, byte[] row) {
+ ServiceCaller callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs);
S stub = stubMaker.apply(channel);
@@ -716,7 +745,7 @@ class RawAsyncTableImpl implements AsyncTable {
@Override
public CompletableFuture coprocessorService(Function stubMaker,
- ServiceCaller callable, byte[] row) {
+ ServiceCaller callable, byte[] row) {
return coprocessorService(stubMaker, callable, null, row);
}
@@ -738,9 +767,9 @@ class RawAsyncTableImpl implements AsyncTable {
}
private void onLocateComplete(Function stubMaker,
- ServiceCaller callable, CoprocessorCallback callback, List locs,
- byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
- AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
+ ServiceCaller callable, CoprocessorCallback callback, List locs,
+ byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
+ AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
if (error != null) {
callback.onError(error);
return;
@@ -769,7 +798,7 @@ class RawAsyncTableImpl implements AsyncTable {
}
private final class CoprocessorServiceBuilderImpl
- implements CoprocessorServiceBuilder {
+ implements CoprocessorServiceBuilder {
private final Function stubMaker;
@@ -786,7 +815,7 @@ class RawAsyncTableImpl implements AsyncTable {
private boolean endKeyInclusive;
public CoprocessorServiceBuilderImpl(Function stubMaker,
- ServiceCaller callable, CoprocessorCallback callback) {
+ ServiceCaller callable, CoprocessorCallback callback) {
this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
this.callable = Preconditions.checkNotNull(callable, "callable is null");
this.callback = Preconditions.checkNotNull(callback, "callback is null");
@@ -823,8 +852,8 @@ class RawAsyncTableImpl implements AsyncTable {
@Override
public CoprocessorServiceBuilder coprocessorService(
- Function stubMaker, ServiceCaller callable,
- CoprocessorCallback callback) {
+ Function stubMaker, ServiceCaller callable,
+ CoprocessorCallback callback) {
return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 9117fefa4ba..b671095c4e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -393,10 +393,11 @@ public abstract class AbstractRpcClient implements RpcC
}
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
- final Message param, Message returnType, final User ticket,
- final Address addr, final RpcCallback callback) {
- Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
- .startSpan();
+ final Message param, Message returnType, final User ticket, final Address addr,
+ final RpcCallback callback) {
+ Span span = TraceUtil.createSpan("RpcClient.callMethod")
+ .setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName())
+ .setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName());
try (Scope scope = span.makeCurrent()) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
new file mode 100644
index 00000000000..708ae4caa23
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncRegionLocatorTracing {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncRegionLocatorTracing.class);
+
+ private static Configuration CONF = HBaseConfiguration.create();
+
+ private AsyncConnectionImpl conn;
+
+ private RegionLocations locs;
+
+ @Rule
+ public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
+
+ @Before
+ public void setUp() throws IOException {
+ RegionInfo metaRegionInfo = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).build();
+ locs = new RegionLocations(
+ new HRegionLocation(metaRegionInfo,
+ ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis())),
+ new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 1),
+ ServerName.valueOf("127.0.0.2", 12345, System.currentTimeMillis())),
+ new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 2),
+ ServerName.valueOf("127.0.0.3", 12345, System.currentTimeMillis())));
+ conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF) {
+
+ @Override
+ public CompletableFuture getMetaRegionLocations() {
+ return CompletableFuture.completedFuture(locs);
+ }
+ }, "test", UserProvider.instantiate(CONF).getCurrent());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(conn, true);
+ }
+
+ private SpanData waitSpan(String name) {
+ Waiter.waitFor(CONF, 1000,
+ () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
+ return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
+ }
+
+ @Test
+ public void testClearCache() {
+ conn.getLocator().clearCache();
+ SpanData span = waitSpan("AsyncRegionLocator.clearCache");
+ assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
+ }
+
+ @Test
+ public void testClearCacheServerName() {
+ ServerName sn = ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis());
+ conn.getLocator().clearCache(sn);
+ SpanData span = waitSpan("AsyncRegionLocator.clearCache");
+ assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
+ assertEquals(sn.toString(), span.getAttributes().get(TraceUtil.SERVER_NAME_KEY));
+ }
+
+ @Test
+ public void testClearCacheTableName() {
+ conn.getLocator().clearCache(TableName.META_TABLE_NAME);
+ SpanData span = waitSpan("AsyncRegionLocator.clearCache");
+ assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
+ assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
+ span.getAttributes().get(TraceUtil.NAMESPACE_KEY));
+ assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
+ span.getAttributes().get(TraceUtil.TABLE_KEY));
+ }
+
+ @Test
+ public void testGetRegionLocation() {
+ conn.getLocator().getRegionLocation(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
+ RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1)).join();
+ SpanData span = waitSpan("AsyncRegionLocator.getRegionLocation");
+ assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
+ assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
+ span.getAttributes().get(TraceUtil.NAMESPACE_KEY));
+ assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
+ span.getAttributes().get(TraceUtil.TABLE_KEY));
+ List regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY);
+ assertEquals(1, regionNames.size());
+ assertEquals(locs.getDefaultRegionLocation().getRegion().getRegionNameAsString(),
+ regionNames.get(0));
+ }
+
+ @Test
+ public void testGetRegionLocations() {
+ conn.getLocator().getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
+ RegionLocateType.CURRENT, false, TimeUnit.SECONDS.toNanos(1)).join();
+ SpanData span = waitSpan("AsyncRegionLocator.getRegionLocations");
+ assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
+ assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
+ span.getAttributes().get(TraceUtil.NAMESPACE_KEY));
+ assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
+ span.getAttributes().get(TraceUtil.TABLE_KEY));
+ List regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY);
+ assertEquals(3, regionNames.size());
+ for (int i = 0; i < 3; i++) {
+ assertEquals(locs.getRegionLocation(i).getRegion().getRegionNameAsString(),
+ regionNames.get(i));
+ }
+ }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
new file mode 100644
index 00000000000..07cdf0e5383
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import io.opentelemetry.api.trace.Span.Kind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncTableTracing {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
+
+ private static Configuration CONF = HBaseConfiguration.create();
+
+ private ClientService.Interface stub;
+
+ private AsyncConnection conn;
+
+ private AsyncTable> table;
+
+ @Rule
+ public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
+
+ @Before
+ public void setUp() throws IOException {
+ stub = mock(ClientService.Interface.class);
+ AtomicInteger scanNextCalled = new AtomicInteger(0);
+ doAnswer(new Answer() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ScanRequest req = invocation.getArgument(1);
+ RpcCallback done = invocation.getArgument(2);
+ if (!req.hasScannerId()) {
+ done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
+ .setMoreResultsInRegion(true).setMoreResults(true).build());
+ } else {
+ if (req.hasCloseScanner() && req.getCloseScanner()) {
+ done.run(ScanResponse.getDefaultInstance());
+ } else {
+ Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
+ .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
+ .setValue(Bytes.toBytes("v")).build();
+ Result result = Result.create(Arrays.asList(cell));
+ ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
+ .addResults(ProtobufUtil.toResult(result));
+ if (req.getLimitOfRows() == 1) {
+ builder.setMoreResultsInRegion(false).setMoreResults(false);
+ } else {
+ builder.setMoreResultsInRegion(true).setMoreResults(true);
+ }
+ ForkJoinPool.commonPool().execute(() -> done.run(builder.build()));
+ }
+ }
+ return null;
+ }
+ }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
+ doAnswer(new Answer() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ClientProtos.MultiResponse resp =
+ ClientProtos.MultiResponse.newBuilder()
+ .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
+ ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
+ .build();
+ RpcCallback done = invocation.getArgument(2);
+ ForkJoinPool.commonPool().execute(() -> done.run(resp));
+ return null;
+ }
+ }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
+ doAnswer(new Answer() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
+ MutateResponse resp;
+ switch (req.getMutateType()) {
+ case INCREMENT:
+ ColumnValue value = req.getColumnValue(0);
+ QualifierValue qvalue = value.getQualifierValue(0);
+ Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setType(Cell.Type.Put).setRow(req.getRow().toByteArray())
+ .setFamily(value.getFamily().toByteArray())
+ .setQualifier(qvalue.getQualifier().toByteArray())
+ .setValue(qvalue.getValue().toByteArray()).build();
+ resp = MutateResponse.newBuilder()
+ .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
+ break;
+ default:
+ resp = MutateResponse.getDefaultInstance();
+ break;
+ }
+ RpcCallback done = invocation.getArgument(2);
+ ForkJoinPool.commonPool().execute(() -> done.run(resp));
+ return null;
+ }
+ }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
+ doAnswer(new Answer() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ RpcCallback done = invocation.getArgument(2);
+ ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance()));
+ return null;
+ }
+ }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
+ conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test",
+ UserProvider.instantiate(CONF).getCurrent()) {
+
+ @Override
+ AsyncRegionLocator getLocator() {
+ AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
+ Answer> answer =
+ new Answer>() {
+
+ @Override
+ public CompletableFuture answer(InvocationOnMock invocation)
+ throws Throwable {
+ TableName tableName = invocation.getArgument(0);
+ RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+ ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
+ HRegionLocation loc = new HRegionLocation(info, serverName);
+ return CompletableFuture.completedFuture(loc);
+ }
+ };
+ doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
+ any(RegionLocateType.class), anyLong());
+ doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
+ anyInt(), any(RegionLocateType.class), anyLong());
+ return locator;
+ }
+
+ @Override
+ ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
+ return stub;
+ }
+ };
+ table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(conn, true);
+ }
+
+ private void assertTrace(String methodName) {
+ Waiter.waitFor(CONF, 1000,
+ () -> traceRule.getSpans().stream()
+ .anyMatch(span -> span.getName().equals("AsyncTable." + methodName) &&
+ span.getKind() == Kind.INTERNAL && span.hasEnded()));
+ SpanData data = traceRule.getSpans().stream()
+ .filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get();
+ assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
+ TableName tableName = table.getName();
+ assertEquals(tableName.getNamespaceAsString(),
+ data.getAttributes().get(TraceUtil.NAMESPACE_KEY));
+ assertEquals(tableName.getNameAsString(), data.getAttributes().get(TraceUtil.TABLE_KEY));
+ }
+
+ @Test
+ public void testExists() {
+ table.exists(new Get(Bytes.toBytes(0))).join();
+ assertTrace("get");
+ }
+
+ @Test
+ public void testGet() {
+ table.get(new Get(Bytes.toBytes(0))).join();
+ assertTrace("get");
+ }
+
+ @Test
+ public void testPut() {
+ table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
+ Bytes.toBytes("v"))).join();
+ assertTrace("put");
+ }
+
+ @Test
+ public void testDelete() {
+ table.delete(new Delete(Bytes.toBytes(0))).join();
+ assertTrace("delete");
+ }
+
+ @Test
+ public void testAppend() {
+ table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
+ Bytes.toBytes("v"))).join();
+ assertTrace("append");
+ }
+
+ @Test
+ public void testIncrement() {
+ table
+ .increment(
+ new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
+ .join();
+ assertTrace("increment");
+ }
+
+ @Test
+ public void testIncrementColumnValue1() {
+ table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
+ .join();
+ assertTrace("increment");
+ }
+
+ @Test
+ public void testIncrementColumnValue2() {
+ table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
+ Durability.ASYNC_WAL).join();
+ assertTrace("increment");
+ }
+
+ @Test
+ public void testCheckAndMutate() {
+ table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
+ .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
+ .build(new Delete(Bytes.toBytes(0)))).join();
+ assertTrace("checkAndMutate");
+ }
+
+ @Test
+ public void testCheckAndMutateList() {
+ CompletableFuture
+ .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
+ .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
+ .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
+ .join();
+ assertTrace("checkAndMutateList");
+ }
+
+ @Test
+ public void testCheckAndMutateAll() {
+ table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
+ .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
+ .build(new Delete(Bytes.toBytes(0))))).join();
+ assertTrace("checkAndMutateList");
+ }
+
+ @Test
+ public void testMutateRow() throws Exception {
+ byte[] row = Bytes.toBytes(0);
+ RowMutations mutation = new RowMutations(row);
+ mutation.add(new Delete(row));
+ table.mutateRow(mutation).get();
+ assertTrace("mutateRow");
+ }
+
+ @Test
+ public void testScanAll() throws IOException {
+ table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
+ assertTrace("scanAll");
+ }
+
+ @Test
+ public void testExistsList() {
+ CompletableFuture
+ .allOf(
+ table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
+ .join();
+ assertTrace("getList");
+ }
+
+ @Test
+ public void testExistsAll() {
+ table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
+ assertTrace("getList");
+ }
+
+ @Test
+ public void testGetList() {
+ CompletableFuture
+ .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
+ .join();
+ assertTrace("getList");
+ }
+
+ @Test
+ public void testGetAll() {
+ table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
+ assertTrace("getList");
+ }
+
+ @Test
+ public void testPutList() {
+ CompletableFuture
+ .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
+ Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
+ .join();
+ assertTrace("putList");
+ }
+
+ @Test
+ public void testPutAll() {
+ table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
+ Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
+ assertTrace("putList");
+ }
+
+ @Test
+ public void testDeleteList() {
+ CompletableFuture
+ .allOf(
+ table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
+ .join();
+ assertTrace("deleteList");
+ }
+
+ @Test
+ public void testDeleteAll() {
+ table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
+ assertTrace("deleteList");
+ }
+
+ @Test
+ public void testBatch() {
+ CompletableFuture
+ .allOf(
+ table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
+ .join();
+ assertTrace("batch");
+ }
+
+ @Test
+ public void testBatchAll() {
+ table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
+ assertTrace("batch");
+ }
+
+ @Test
+ public void testConnClose() throws IOException {
+ conn.close();
+ Waiter.waitFor(CONF, 1000,
+ () -> traceRule.getSpans().stream()
+ .anyMatch(span -> span.getName().equals("AsyncConnection.close") &&
+ span.getKind() == Kind.INTERNAL && span.hasEnded()));
+ SpanData data = traceRule.getSpans().stream()
+ .filter(s -> s.getName().equals("AsyncConnection.close")).findFirst().get();
+ assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
index 768de9c3b9c..d0da0710313 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
@@ -18,7 +18,19 @@
package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Span.Kind;
+import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.attributes.SemanticAttributes;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -26,10 +38,132 @@ public final class TraceUtil {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase";
+ public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE;
+
+ public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
+
+ public static final AttributeKey> REGION_NAMES_KEY =
+ AttributeKey.stringArrayKey("db.hbase.regions");
+
+ public static final AttributeKey RPC_SERVICE_KEY =
+ AttributeKey.stringKey("db.hbase.rpc.service");
+
+ public static final AttributeKey RPC_METHOD_KEY =
+ AttributeKey.stringKey("db.hbase.rpc.method");
+
+ public static final AttributeKey SERVER_NAME_KEY =
+ AttributeKey.stringKey("db.hbase.server.name");
+
private TraceUtil() {
}
public static Tracer getGlobalTracer() {
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
}
+
+ /**
+ * Create a {@link Kind#INTERNAL} span.
+ */
+ public static Span createSpan(String name) {
+ return createSpan(name, Kind.INTERNAL);
+ }
+
+ /**
+ * Create a {@link Kind#INTERNAL} span and set table related attributes.
+ */
+ public static Span createTableSpan(String spanName, TableName tableName) {
+ return createSpan(spanName).setAttribute(NAMESPACE_KEY, tableName.getNamespaceAsString())
+ .setAttribute(TABLE_KEY, tableName.getNameAsString());
+ }
+
+ /**
+ * Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one
+ * {@link Kind#CLIENT} span and one {@link Kind#SERVER} span for a traced request, so use this
+ * with caution when you want to create spans with kind other than {@link Kind#INTERNAL}.
+ */
+ private static Span createSpan(String name, Kind kind) {
+ return getGlobalTracer().spanBuilder(name).setSpanKind(kind).startSpan();
+ }
+
+ /**
+ * Create a span which parent is from remote, i.e, passed through rpc.
+ *
+ * We will set the kind of the returned span to {@link Kind#SERVER}, as this should be the top
+ * most span at server side.
+ */
+ public static Span createRemoteSpan(String name, Context ctx) {
+ return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan();
+ }
+
+ /**
+ * Trace an asynchronous operation for a table.
+ */
+ public static CompletableFuture tracedFuture(Supplier> action,
+ String spanName, TableName tableName) {
+ Span span = createTableSpan(spanName, tableName);
+ try (Scope scope = span.makeCurrent()) {
+ CompletableFuture future = action.get();
+ endSpan(future, span);
+ return future;
+ }
+ }
+
+ /**
+ * Trace an asynchronous operation.
+ */
+ public static CompletableFuture tracedFuture(Supplier> action,
+ String spanName) {
+ Span span = createSpan(spanName);
+ try (Scope scope = span.makeCurrent()) {
+ CompletableFuture future = action.get();
+ endSpan(future, span);
+ return future;
+ }
+ }
+
+ /**
+ * Trace an asynchronous operation, and finish the create {@link Span} when all the given
+ * {@code futures} are completed.
+ */
+ public static List> tracedFutures(
+ Supplier>> action, String spanName, TableName tableName) {
+ Span span = createTableSpan(spanName, tableName);
+ try (Scope scope = span.makeCurrent()) {
+ List> futures = action.get();
+ endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);
+ return futures;
+ }
+ }
+
+ /**
+ * Finish the {@code span} when the given {@code future} is completed.
+ */
+ private static void endSpan(CompletableFuture> future, Span span) {
+ FutureUtils.addListener(future, (resp, error) -> {
+ if (error != null) {
+ span.recordException(error);
+ span.setStatus(StatusCode.ERROR);
+ } else {
+ span.setStatus(StatusCode.OK);
+ }
+ span.end();
+ });
+ }
+
+ public static void trace(Runnable action, String spanName) {
+ trace(action, () -> createSpan(spanName));
+ }
+
+ public static void trace(Runnable action, Supplier creator) {
+ Span span = creator.get();
+ try (Scope scope = span.makeCurrent()) {
+ action.run();
+ span.setStatus(StatusCode.OK);
+ } catch (Throwable e) {
+ span.recordException(e);
+ span.setStatus(StatusCode.ERROR);
+ } finally {
+ span.end();
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 203f079b4bd..7cc1d2bbee3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -122,9 +122,10 @@ public class CallRunner {
RpcServer.CurCall.set(call);
String serviceName = getServiceName();
String methodName = getMethodName();
- String traceString = serviceName + "." + methodName;
- Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
- .setParent(Context.current().with(((ServerCall>) call).getSpan())).startSpan();
+ Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod")
+ .setParent(Context.current().with(((ServerCall>) call).getSpan())).startSpan()
+ .setAttribute(TraceUtil.RPC_SERVICE_KEY, serviceName)
+ .setAttribute(TraceUtil.RPC_METHOD_KEY, methodName);
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index db7f052ca48..823005b2c52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -629,8 +629,7 @@ abstract class ServerRpcConnection implements Closeable {
};
Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.current(), header.getTraceInfo(), getter);
- Span span =
- TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan();
+ Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
try (Scope scope = span.makeCurrent()) {
int id = header.getCallId();
if (RpcServer.LOG.isTraceEnabled()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 11978ca6c8e..4aca7640031 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
@@ -448,6 +449,19 @@ public abstract class AbstractTestIPC {
}
}
+ private SpanData waitSpan(String name) {
+ Waiter.waitFor(CONF, 1000,
+ () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
+ return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
+ }
+
+ private void assertRpcAttribute(SpanData data, String methodName) {
+ assertEquals(SERVICE.getDescriptorForType().getName(),
+ data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY));
+ assertEquals(methodName,
+ data.getAttributes().get(TraceUtil.RPC_METHOD_KEY));
+ }
+
@Test
public void testTracing() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
@@ -457,9 +471,8 @@ public abstract class AbstractTestIPC {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
- Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName)
- .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause")));
-
+ assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause");
+ assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause");
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertThat(
@@ -471,9 +484,8 @@ public abstract class AbstractTestIPC {
traceRule.clearSpans();
assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
- Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName)
- .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error")));
-
+ assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error");
+ assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error");
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
diff --git a/pom.xml b/pom.xml
index 95bb7121d22..9fcf86b648a 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1484,7 +1484,6 @@
4.13
1.3
0.13.1
- 0.13.0
1.2.17
2.28.2
@@ -2193,7 +2192,7 @@
io.opentelemetry.javaagent
opentelemetry-javaagent
- ${opentelemetry-instrumentation.version}
+ ${opentelemetry.version}
all