HBASE-26474 Implement connection-level attributes (#4014)
Add support for `db.system`, `db.connection_string`, `db.user`. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Huaxiang Sun <huaxiangsun@apache.org> Co-authored-by: Josh Elser <josh.elser@gmail.com>
This commit is contained in:
parent
5d14589314
commit
0ee15e0865
|
@ -272,6 +272,11 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
|
|||
getClass().getSimpleName() + ".getActiveMaster");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectionString() {
|
||||
return "unimplemented";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
trace(() -> {
|
||||
|
|
|
@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
|
|||
* The implementation of AsyncConnection.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class AsyncConnectionImpl implements AsyncConnection {
|
||||
public class AsyncConnectionImpl implements AsyncConnection {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
|
||||
|
||||
|
@ -191,6 +191,14 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
return choreService;
|
||||
}
|
||||
|
||||
public User getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public ConnectionRegistry getConnectionRegistry() {
|
||||
return registry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -20,24 +20,27 @@ package org.apache.hadoop.hbase.client;
|
|||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
|
||||
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
|
||||
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -95,9 +98,12 @@ class AsyncRegionLocator {
|
|||
return TableName.isMetaTableName(tableName);
|
||||
}
|
||||
|
||||
private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
|
||||
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
|
||||
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
|
||||
private <T> CompletableFuture<T> tracedLocationFuture(
|
||||
Supplier<CompletableFuture<T>> action,
|
||||
Function<T, List<String>> getRegionNames,
|
||||
Supplier<Span> spanSupplier
|
||||
) {
|
||||
final Span span = spanSupplier.get();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
CompletableFuture<T> future = action.get();
|
||||
FutureUtils.addListener(future, (resp, error) -> {
|
||||
|
@ -116,18 +122,30 @@ class AsyncRegionLocator {
|
|||
}
|
||||
}
|
||||
|
||||
private List<String> getRegionName(RegionLocations locs) {
|
||||
List<String> names = new ArrayList<>();
|
||||
for (HRegionLocation loc : locs.getRegionLocations()) {
|
||||
if (loc != null) {
|
||||
names.add(loc.getRegion().getRegionNameAsString());
|
||||
}
|
||||
static List<String> getRegionNames(RegionLocations locs) {
|
||||
if (locs == null || locs.getRegionLocations() == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return names;
|
||||
return Arrays.stream(locs.getRegionLocations())
|
||||
.filter(Objects::nonNull)
|
||||
.map(HRegionLocation::getRegion)
|
||||
.map(RegionInfo::getRegionNameAsString)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
static List<String> getRegionNames(HRegionLocation location) {
|
||||
return Optional.ofNullable(location)
|
||||
.map(HRegionLocation::getRegion)
|
||||
.map(RegionInfo::getRegionNameAsString)
|
||||
.map(Collections::singletonList)
|
||||
.orElseGet(Collections::emptyList);
|
||||
}
|
||||
|
||||
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
|
||||
RegionLocateType type, boolean reload, long timeoutNs) {
|
||||
final Supplier<Span> supplier = new TableSpanBuilder(conn)
|
||||
.setName("AsyncRegionLocator.getRegionLocations")
|
||||
.setTableName(tableName);
|
||||
return tracedLocationFuture(() -> {
|
||||
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
|
||||
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
|
||||
|
@ -137,11 +155,14 @@ class AsyncRegionLocator {
|
|||
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
|
||||
"ms) waiting for region locations for " + tableName + ", row='" +
|
||||
Bytes.toStringBinary(row) + "'");
|
||||
}, this::getRegionName, tableName, "getRegionLocations");
|
||||
}, AsyncRegionLocator::getRegionNames, supplier);
|
||||
}
|
||||
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
|
||||
final Supplier<Span> supplier = new TableSpanBuilder(conn)
|
||||
.setName("AsyncRegionLocator.getRegionLocation")
|
||||
.setTableName(tableName);
|
||||
return tracedLocationFuture(() -> {
|
||||
// meta region can not be split right now so we always call the same method.
|
||||
// Change it later if the meta table can have more than one regions.
|
||||
|
@ -172,8 +193,7 @@ class AsyncRegionLocator {
|
|||
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
|
||||
"ms) waiting for region location for " + tableName + ", row='" +
|
||||
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
|
||||
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
|
||||
"getRegionLocation");
|
||||
}, AsyncRegionLocator::getRegionNames, supplier);
|
||||
}
|
||||
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
|
@ -201,6 +221,9 @@ class AsyncRegionLocator {
|
|||
}
|
||||
|
||||
void clearCache(TableName tableName) {
|
||||
Supplier<Span> supplier = new TableSpanBuilder(conn)
|
||||
.setName("AsyncRegionLocator.clearCache")
|
||||
.setTableName(tableName);
|
||||
TraceUtil.trace(() -> {
|
||||
LOG.debug("Clear meta cache for {}", tableName);
|
||||
if (tableName.equals(META_TABLE_NAME)) {
|
||||
|
@ -208,24 +231,28 @@ class AsyncRegionLocator {
|
|||
} else {
|
||||
nonMetaRegionLocator.clearCache(tableName);
|
||||
}
|
||||
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
void clearCache(ServerName serverName) {
|
||||
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
|
||||
.setName("AsyncRegionLocator.clearCache")
|
||||
.addAttribute(SERVER_NAME_KEY, serverName.getServerName());
|
||||
TraceUtil.trace(() -> {
|
||||
LOG.debug("Clear meta cache for {}", serverName);
|
||||
metaRegionLocator.clearCache(serverName);
|
||||
nonMetaRegionLocator.clearCache(serverName);
|
||||
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
|
||||
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
|
||||
serverName.getServerName()));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
void clearCache() {
|
||||
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
|
||||
.setName("AsyncRegionLocator.clearCache");
|
||||
TraceUtil.trace(() -> {
|
||||
metaRegionLocator.clearCache();
|
||||
nonMetaRegionLocator.clearCache();
|
||||
}, "AsyncRegionLocator.clearCache");
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
|
@ -175,8 +176,7 @@ public interface ClusterConnection extends Connection {
|
|||
* question
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
HRegionLocation locateRegion(final byte[] regionName)
|
||||
throws IOException;
|
||||
HRegionLocation locateRegion(final byte[] regionName) throws IOException;
|
||||
|
||||
/**
|
||||
* Gets the locations of all regions in the specified table, <i>tableName</i>.
|
||||
|
@ -335,4 +335,14 @@ public interface ClusterConnection extends Connection {
|
|||
* Get the bootstrap node list of another region server.
|
||||
*/
|
||||
List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the {@link User} associated with this connection. May be {@code null}.
|
||||
*/
|
||||
User getUser();
|
||||
|
||||
/**
|
||||
* Get the {@link ConnectionRegistry} used to orient this cluster.
|
||||
*/
|
||||
ConnectionRegistry getConnectionRegistry();
|
||||
}
|
||||
|
|
|
@ -166,7 +166,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat
|
|||
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
|
||||
justification="Access to the conncurrent hash map is under a lock so should be fine.")
|
||||
@InterfaceAudience.Private
|
||||
class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||
public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
|
||||
|
||||
|
@ -513,6 +513,16 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return this.metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public User getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionRegistry getConnectionRegistry() {
|
||||
return registry;
|
||||
}
|
||||
|
||||
private ThreadPoolExecutor getBatchPool() {
|
||||
if (batchPool == null) {
|
||||
synchronized (this) {
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
* Internal use only.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
interface ConnectionRegistry extends Closeable {
|
||||
public interface ConnectionRegistry extends Closeable {
|
||||
|
||||
/**
|
||||
* Get the location of meta region(s).
|
||||
|
@ -48,6 +48,13 @@ interface ConnectionRegistry extends Closeable {
|
|||
*/
|
||||
CompletableFuture<ServerName> getActiveMaster();
|
||||
|
||||
/**
|
||||
* Return the connection string associated with this registry instance. This value is
|
||||
* informational, used for annotating traces. Values returned may not be valid for establishing a
|
||||
* working cluster connection.
|
||||
*/
|
||||
String getConnectionString();
|
||||
|
||||
/**
|
||||
* Closes this instance and releases any system resources associated with it
|
||||
*/
|
||||
|
|
|
@ -18,18 +18,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
/**
|
||||
* An implementation of {@link RegionLocator}. Used to view region location information for a single
|
||||
|
@ -62,24 +72,34 @@ public class HRegionLocator implements RegionLocator {
|
|||
@Override
|
||||
public HRegionLocation getRegionLocation(byte[] row, int replicaId, boolean reload)
|
||||
throws IOException {
|
||||
return TraceUtil.trace(() -> connection.locateRegion(tableName, row, !reload, true, replicaId)
|
||||
.getRegionLocation(replicaId), () -> TraceUtil
|
||||
.createTableSpan(getClass().getSimpleName() + ".getRegionLocation", tableName));
|
||||
final Supplier<Span> supplier = new TableSpanBuilder(connection)
|
||||
.setName("HRegionLocator.getRegionLocation")
|
||||
.setTableName(tableName);
|
||||
return tracedLocationFuture(
|
||||
() -> connection.locateRegion(tableName, row, !reload, true, replicaId)
|
||||
.getRegionLocation(replicaId),
|
||||
AsyncRegionLocator::getRegionNames,
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegionLocation> getRegionLocations(byte[] row, boolean reload) throws IOException {
|
||||
return TraceUtil.trace(() -> {
|
||||
RegionLocations locs =
|
||||
connection.locateRegion(tableName, row, !reload, true, RegionInfo.DEFAULT_REPLICA_ID);
|
||||
return Arrays.asList(locs.getRegionLocations());
|
||||
}, () -> TraceUtil
|
||||
.createTableSpan(getClass().getSimpleName() + ".getRegionLocations", tableName));
|
||||
final Supplier<Span> supplier = new TableSpanBuilder(connection)
|
||||
.setName("HRegionLocator.getRegionLocations")
|
||||
.setTableName(tableName);
|
||||
final RegionLocations locs = tracedLocationFuture(
|
||||
() -> connection.locateRegion(tableName, row, !reload, true,
|
||||
RegionInfo.DEFAULT_REPLICA_ID),
|
||||
AsyncRegionLocator::getRegionNames, supplier);
|
||||
return Arrays.asList(locs.getRegionLocations());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegionLocation> getAllRegionLocations() throws IOException {
|
||||
return TraceUtil.trace(() -> {
|
||||
final Supplier<Span> supplier = new TableSpanBuilder(connection)
|
||||
.setName("HRegionLocator.getAllRegionLocations")
|
||||
.setTableName(tableName);
|
||||
return tracedLocationFuture(() -> {
|
||||
ArrayList<HRegionLocation> regions = new ArrayList<>();
|
||||
for (RegionLocations locations : listRegionLocations()) {
|
||||
for (HRegionLocation location : locations.getRegionLocations()) {
|
||||
|
@ -88,15 +108,27 @@ public class HRegionLocator implements RegionLocator {
|
|||
connection.cacheLocation(tableName, locations);
|
||||
}
|
||||
return regions;
|
||||
}, () -> TraceUtil
|
||||
.createTableSpan(getClass().getSimpleName() + ".getAllRegionLocations", tableName));
|
||||
}, HRegionLocator::getRegionNames, supplier);
|
||||
}
|
||||
|
||||
private static List<String> getRegionNames(List<HRegionLocation> locations) {
|
||||
if (CollectionUtils.isEmpty(locations)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return locations.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(AsyncRegionLocator::getRegionNames)
|
||||
.filter(Objects::nonNull)
|
||||
.flatMap(List::stream)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearRegionLocationCache() {
|
||||
TraceUtil.trace(() ->
|
||||
connection.clearRegionCache(tableName), () -> TraceUtil
|
||||
.createTableSpan(this.getClass().getSimpleName() + ".clearRegionLocationCache", tableName));
|
||||
final Supplier<Span> supplier = new TableSpanBuilder(connection)
|
||||
.setName("HRegionLocator.clearRegionLocationCache")
|
||||
.setTableName(tableName);
|
||||
TraceUtil.trace(() -> connection.clearRegionCache(tableName), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,4 +156,26 @@ public class HRegionLocator implements RegionLocator {
|
|||
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName);
|
||||
return regions;
|
||||
}
|
||||
|
||||
private <T> T tracedLocationFuture(
|
||||
TraceUtil.IOExceptionCallable<T> action,
|
||||
Function<T, List<String>> getRegionNames,
|
||||
Supplier<Span> spanSupplier
|
||||
) throws IOException {
|
||||
final Span span = spanSupplier.get();
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
final T result = action.call();
|
||||
final List<String> regionNames = getRegionNames.apply(result);
|
||||
if (!CollectionUtils.isEmpty(regionNames)) {
|
||||
span.setAttribute(REGION_NAMES_KEY, regionNames);
|
||||
}
|
||||
span.setStatus(StatusCode.OK);
|
||||
span.end();
|
||||
return result;
|
||||
} catch (IOException e) {
|
||||
TraceUtil.setError(span, e);
|
||||
span.end();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import com.google.protobuf.Service;
|
|||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
|
||||
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
|
@ -359,7 +361,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public Result get(final Get get) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(get);
|
||||
return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier);
|
||||
|
@ -402,7 +404,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public Result[] get(List<Get> gets) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -429,7 +431,7 @@ public class HTable implements Table {
|
|||
@Override
|
||||
public void batch(final List<? extends Row> actions, final Object[] results)
|
||||
throws InterruptedException, IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
|
@ -468,7 +470,7 @@ public class HTable implements Table {
|
|||
.setOperationTimeout(operationTimeoutMs)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
final Span span = new TableOperationSpanBuilder()
|
||||
final Span span = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||
.build();
|
||||
|
@ -507,7 +509,7 @@ public class HTable implements Table {
|
|||
.setRpcTimeout(writeTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
final Span span = new TableOperationSpanBuilder()
|
||||
final Span span = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||
.build();
|
||||
|
@ -525,7 +527,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public void delete(final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(delete);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
|
@ -547,7 +549,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public void delete(final List<Delete> deletes) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
|
@ -573,7 +575,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public void put(final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(put);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
|
@ -596,7 +598,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public void put(final List<Put> puts) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
|
@ -614,7 +616,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public Result mutateRow(final RowMutations rm) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -670,7 +672,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public Result append(final Append append) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(append);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -697,7 +699,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public Result increment(final Increment increment) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(increment);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -731,7 +733,7 @@ public class HTable implements Table {
|
|||
public long incrementColumnValue(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final long amount, final Durability durability)
|
||||
throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.INCREMENT);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -769,7 +771,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -782,7 +784,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -795,7 +797,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOperator op, final byte [] value, final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -807,7 +809,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final byte[] value, final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -820,7 +822,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -833,7 +835,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -910,7 +912,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -923,7 +925,7 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -933,7 +935,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(checkAndMutate);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -982,7 +984,7 @@ public class HTable implements Table {
|
|||
@Override
|
||||
public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
|
||||
throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -1040,7 +1042,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean exists(final Get get) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(get);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -1052,7 +1054,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean[] exists(List<Get> gets) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -1108,6 +1110,10 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
final Supplier<Span> supplier = new TableSpanBuilder(connection)
|
||||
.setName("HTable.close")
|
||||
.setTableName(tableName)
|
||||
.setSpanKind(SpanKind.INTERNAL);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
if (this.closed) {
|
||||
return;
|
||||
|
@ -1126,7 +1132,7 @@ public class HTable implements Table {
|
|||
}
|
||||
}
|
||||
this.closed = true;
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".close", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
// validate for well-formedness
|
||||
|
@ -1456,7 +1462,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean thenPut(Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -1469,7 +1475,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean thenDelete(Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -1481,7 +1487,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean thenMutate(RowMutations mutation) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -1511,7 +1517,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean thenPut(Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
|
@ -1523,7 +1529,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean thenDelete(Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
@ -1533,7 +1539,7 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public boolean thenMutate(RowMutations mutation) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
|
|
|
@ -87,9 +87,12 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
|
|||
return masterAddrs;
|
||||
}
|
||||
|
||||
private final String connectionString;
|
||||
|
||||
MasterRegistry(Configuration conf) throws IOException {
|
||||
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
|
||||
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
|
||||
connectionString = getConnectionString(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,6 +105,15 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
|
|||
return getMasters();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectionString() {
|
||||
return connectionString;
|
||||
}
|
||||
|
||||
static String getConnectionString(Configuration conf) throws UnknownHostException {
|
||||
return getMasterAddr(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the default master address end point if it is not specified in the configuration.
|
||||
* <p/>
|
||||
|
|
|
@ -261,7 +261,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
}
|
||||
|
||||
private TableOperationSpanBuilder newTableOperationSpanBuilder() {
|
||||
return new TableOperationSpanBuilder().setTableName(tableName);
|
||||
return new TableOperationSpanBuilder(conn).setTableName(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -72,9 +73,23 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
|
|||
|
||||
private static final char ADDRS_CONF_SEPARATOR = ',';
|
||||
|
||||
private final String connectionString;
|
||||
|
||||
RpcConnectionRegistry(Configuration conf) throws IOException {
|
||||
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
|
||||
MIN_SECS_BETWEEN_REFRESHES);
|
||||
connectionString = buildConnectionString(conf);
|
||||
}
|
||||
|
||||
private String buildConnectionString(Configuration conf) throws UnknownHostException {
|
||||
final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
|
||||
if (StringUtils.isBlank(configuredBootstrapNodes)) {
|
||||
return MasterRegistry.getConnectionString(conf);
|
||||
}
|
||||
return Splitter.on(ADDRS_CONF_SEPARATOR)
|
||||
.trimResults()
|
||||
.splitToStream(configuredBootstrapNodes)
|
||||
.collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,6 +106,11 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectionString() {
|
||||
return connectionString;
|
||||
}
|
||||
|
||||
private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
|
||||
return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
|
||||
.collect(Collectors.toSet());
|
||||
|
|
|
@ -247,6 +247,13 @@ class ZKConnectionRegistry implements ConnectionRegistry {
|
|||
"ZKConnectionRegistry.getActiveMaster");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectionString() {
|
||||
final String serverList = zk.getConnectString();
|
||||
final String baseZNode = znodePaths.baseZNode;
|
||||
return serverList + ":" + baseZNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
zk.close();
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_CONNECTION_STRING;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM_VALUE;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_USER;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Construct {@link Span} instances originating from the client side of a connection.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ConnectionSpanBuilder implements Supplier<Span> {
|
||||
|
||||
private String name;
|
||||
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
|
||||
|
||||
public ConnectionSpanBuilder(final AsyncConnectionImpl conn) {
|
||||
populateConnectionAttributes(attributes, conn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span get() {
|
||||
return build();
|
||||
}
|
||||
|
||||
public ConnectionSpanBuilder setName(final String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
public <T> ConnectionSpanBuilder addAttribute(final AttributeKey<T> key, T value) {
|
||||
attributes.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Span build() {
|
||||
final SpanBuilder builder = TraceUtil.getGlobalTracer()
|
||||
.spanBuilder(name)
|
||||
// TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
|
||||
.setSpanKind(SpanKind.CLIENT);
|
||||
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
|
||||
return builder.startSpan();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #populateConnectionAttributes(Map, AsyncConnectionImpl)
|
||||
*/
|
||||
static void populateConnectionAttributes(
|
||||
final Map<AttributeKey<?>, Object> attributes,
|
||||
final ClusterConnection conn
|
||||
) {
|
||||
attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
|
||||
attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
|
||||
attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
|
||||
.map(Object::toString)
|
||||
.orElse(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static utility method that performs the primary logic of this builder. It is visible to other
|
||||
* classes in this package so that other builders can use this functionality as a mix-in.
|
||||
* @param attributes the attributes map to be populated.
|
||||
* @param conn the source of attribute values.
|
||||
*/
|
||||
static void populateConnectionAttributes(
|
||||
final Map<AttributeKey<?>, Object> attributes,
|
||||
final AsyncConnectionImpl conn
|
||||
) {
|
||||
attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
|
||||
attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
|
||||
attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
|
||||
.map(Object::toString)
|
||||
.orElse(null));
|
||||
}
|
||||
}
|
|
@ -18,10 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
|
@ -32,7 +29,9 @@ import java.util.Map;
|
|||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
|
@ -46,8 +45,8 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Construct {@link io.opentelemetry.api.trace.Span} instances originating from
|
||||
* "table operations" -- the verbs in our public API that interact with data in tables.
|
||||
* Construct {@link Span} instances originating from "table operations" -- the verbs in our public
|
||||
* API that interact with data in tables.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableOperationSpanBuilder implements Supplier<Span> {
|
||||
|
@ -60,7 +59,16 @@ public class TableOperationSpanBuilder implements Supplier<Span> {
|
|||
private TableName tableName;
|
||||
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
|
||||
|
||||
@Override public Span get() {
|
||||
public TableOperationSpanBuilder(final ClusterConnection conn) {
|
||||
ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder(final AsyncConnectionImpl conn) {
|
||||
ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span get() {
|
||||
return build();
|
||||
}
|
||||
|
||||
|
@ -84,9 +92,7 @@ public class TableOperationSpanBuilder implements Supplier<Span> {
|
|||
|
||||
public TableOperationSpanBuilder setTableName(final TableName tableName) {
|
||||
this.tableName = tableName;
|
||||
attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
|
||||
attributes.put(DB_NAME, tableName.getNamespaceAsString());
|
||||
attributes.put(TABLE_KEY, tableName.getNameAsString());
|
||||
TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Construct {@link Span} instances involving data tables.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableSpanBuilder implements Supplier<Span> {
|
||||
|
||||
private String name;
|
||||
private SpanKind spanKind = SpanKind.CLIENT;
|
||||
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
|
||||
|
||||
public TableSpanBuilder(ClusterConnection conn) {
|
||||
ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
|
||||
}
|
||||
|
||||
public TableSpanBuilder(AsyncConnectionImpl conn) {
|
||||
ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span get() {
|
||||
return build();
|
||||
}
|
||||
|
||||
public TableSpanBuilder setName(final String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TableSpanBuilder setSpanKind(final SpanKind spanKind) {
|
||||
this.spanKind = spanKind;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TableSpanBuilder setTableName(final TableName tableName) {
|
||||
populateTableNameAttributes(attributes, tableName);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Span build() {
|
||||
final SpanBuilder builder = TraceUtil.getGlobalTracer()
|
||||
.spanBuilder(name)
|
||||
// TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
|
||||
.setSpanKind(spanKind);
|
||||
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
|
||||
return builder.startSpan();
|
||||
}
|
||||
|
||||
/**
|
||||
* Static utility method that performs the primary logic of this builder. It is visible to other
|
||||
* classes in this package so that other builders can use this functionality as a mix-in.
|
||||
* @param attributes the attributes map to be populated.
|
||||
* @param tableName the source of attribute values.
|
||||
*/
|
||||
static void populateTableNameAttributes(
|
||||
final Map<AttributeKey<?>, Object> attributes,
|
||||
final TableName tableName
|
||||
) {
|
||||
attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
|
||||
attributes.put(DB_NAME, tableName.getNamespaceAsString());
|
||||
attributes.put(TABLE_KEY, tableName.getNameAsString());
|
||||
}
|
||||
}
|
|
@ -47,6 +47,11 @@ class DoNothingConnectionRegistry implements ConnectionRegistry {
|
|||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectionString() {
|
||||
return "nothing";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -17,13 +17,25 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -31,6 +43,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MatcherPredicate;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -38,24 +51,26 @@ import org.apache.hadoop.hbase.Waiter;
|
|||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
@Category({ ClientTests.class, MediumTests.class })
|
||||
public class TestAsyncRegionLocatorTracing {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncRegionLocatorTracing.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncRegionLocatorTracing.class);
|
||||
|
||||
private static Configuration CONF = HBaseConfiguration.create();
|
||||
private static final Configuration CONF = HBaseConfiguration.create();
|
||||
|
||||
private AsyncConnectionImpl conn;
|
||||
|
||||
|
@ -89,16 +104,35 @@ public class TestAsyncRegionLocatorTracing {
|
|||
}
|
||||
|
||||
private SpanData waitSpan(String name) {
|
||||
Waiter.waitFor(CONF, 1000,
|
||||
() -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
|
||||
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
|
||||
return waitSpan(hasName(name));
|
||||
}
|
||||
|
||||
private SpanData waitSpan(Matcher<SpanData> matcher) {
|
||||
Matcher<SpanData> spanLocator = allOf(matcher, hasEnded());
|
||||
try {
|
||||
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
|
||||
"waiting for span",
|
||||
() -> traceRule.getSpans(), hasItem(spanLocator)));
|
||||
} catch (AssertionError e) {
|
||||
LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}",
|
||||
traceRule.getSpans());
|
||||
throw e;
|
||||
}
|
||||
return traceRule.getSpans()
|
||||
.stream()
|
||||
.filter(spanLocator::matches)
|
||||
.findFirst()
|
||||
.orElseThrow(AssertionError::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearCache() {
|
||||
conn.getLocator().clearCache();
|
||||
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
|
||||
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -106,19 +140,22 @@ public class TestAsyncRegionLocatorTracing {
|
|||
ServerName sn = ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis());
|
||||
conn.getLocator().clearCache(sn);
|
||||
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
|
||||
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
|
||||
assertEquals(sn.toString(), span.getAttributes().get(HBaseSemanticAttributes.SERVER_NAME_KEY));
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
hasAttributes(containsEntry("db.hbase.server.name", sn.getServerName()))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearCacheTableName() {
|
||||
conn.getLocator().clearCache(TableName.META_TABLE_NAME);
|
||||
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
|
||||
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
|
||||
assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
|
||||
span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY));
|
||||
assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
|
||||
span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(TableName.META_TABLE_NAME)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -126,15 +163,14 @@ public class TestAsyncRegionLocatorTracing {
|
|||
conn.getLocator().getRegionLocation(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
|
||||
RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1)).join();
|
||||
SpanData span = waitSpan("AsyncRegionLocator.getRegionLocation");
|
||||
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
|
||||
assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
|
||||
span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY));
|
||||
assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
|
||||
span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
|
||||
List<String> regionNames = span.getAttributes().get(HBaseSemanticAttributes.REGION_NAMES_KEY);
|
||||
assertEquals(1, regionNames.size());
|
||||
assertEquals(locs.getDefaultRegionLocation().getRegion().getRegionNameAsString(),
|
||||
regionNames.get(0));
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(TableName.META_TABLE_NAME),
|
||||
hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.regions",
|
||||
locs.getDefaultRegionLocation().getRegion().getRegionNameAsString()))));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -142,16 +178,16 @@ public class TestAsyncRegionLocatorTracing {
|
|||
conn.getLocator().getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
|
||||
RegionLocateType.CURRENT, false, TimeUnit.SECONDS.toNanos(1)).join();
|
||||
SpanData span = waitSpan("AsyncRegionLocator.getRegionLocations");
|
||||
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
|
||||
assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
|
||||
span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY));
|
||||
assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
|
||||
span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
|
||||
List<String> regionNames = span.getAttributes().get(HBaseSemanticAttributes.REGION_NAMES_KEY);
|
||||
assertEquals(3, regionNames.size());
|
||||
for (int i = 0; i < 3; i++) {
|
||||
assertEquals(locs.getRegionLocation(i).getRegion().getRegionNameAsString(),
|
||||
regionNames.get(i));
|
||||
}
|
||||
String[] expectedRegions = Arrays.stream(locs.getRegionLocations())
|
||||
.map(HRegionLocation::getRegion)
|
||||
.map(RegionInfo::getRegionNameAsString)
|
||||
.toArray(String[]::new);
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(TableName.META_TABLE_NAME),
|
||||
hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions)))));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,24 +17,22 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
|
@ -57,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -99,7 +98,7 @@ public class TestAsyncTableTracing {
|
|||
|
||||
private ClientService.Interface stub;
|
||||
|
||||
private AsyncConnection conn;
|
||||
private AsyncConnectionImpl conn;
|
||||
|
||||
private AsyncTable<?> table;
|
||||
|
||||
|
@ -197,8 +196,9 @@ public class TestAsyncTableTracing {
|
|||
return null;
|
||||
}
|
||||
}).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
|
||||
final User user = UserProvider.instantiate(CONF).getCurrent();
|
||||
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test",
|
||||
UserProvider.instantiate(CONF).getCurrent()) {
|
||||
user) {
|
||||
|
||||
@Override
|
||||
AsyncRegionLocator getLocator() {
|
||||
|
@ -236,22 +236,13 @@ public class TestAsyncTableTracing {
|
|||
Closeables.close(conn, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* All {@link Span}s generated from table data access operations over {@code tableName} should
|
||||
* include these attributes.
|
||||
*/
|
||||
static Matcher<SpanData> buildBaseAttributesMatcher(TableName tableName) {
|
||||
return hasAttributes(allOf(
|
||||
containsEntry("db.name", tableName.getNamespaceAsString()),
|
||||
containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()),
|
||||
containsEntry("db.hbase.table", tableName.getNameAsString())));
|
||||
}
|
||||
|
||||
private void assertTrace(String tableOperation) {
|
||||
assertTrace(tableOperation, new IsAnything<>());
|
||||
}
|
||||
|
||||
private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
|
||||
// n.b. this method implementation must match the one of the same name found in
|
||||
// TestHTableTracing
|
||||
final TableName tableName = table.getName();
|
||||
final Matcher<SpanData> spanLocator = allOf(
|
||||
hasName(containsString(tableOperation)), hasEnded());
|
||||
|
@ -269,7 +260,8 @@ public class TestAsyncTableTracing {
|
|||
hasName(expectedName),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
buildBaseAttributesMatcher(tableName),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(tableName),
|
||||
matcher));
|
||||
}
|
||||
|
||||
|
@ -524,16 +516,4 @@ public class TestAsyncTableTracing {
|
|||
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnClose() throws IOException {
|
||||
conn.close();
|
||||
Waiter.waitFor(CONF, 1000,
|
||||
() -> traceRule.getSpans().stream()
|
||||
.anyMatch(span -> span.getName().equals("AsyncConnection.close") &&
|
||||
span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
|
||||
SpanData data = traceRule.getSpans().stream()
|
||||
.filter(s -> s.getName().equals("AsyncConnection.close")).findFirst().get();
|
||||
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.TestAsyncTableTracing.buildBaseAttributesMatcher;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -238,6 +239,8 @@ public class TestHTableTracing extends TestTracingBase {
|
|||
}
|
||||
|
||||
private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
|
||||
// n.b. this method implementation must match the one of the same name found in
|
||||
// TestAsyncTableTracing
|
||||
final TableName tableName = table.getName();
|
||||
final Matcher<SpanData> spanLocator = allOf(
|
||||
hasName(containsString(tableOperation)), hasEnded());
|
||||
|
@ -255,7 +258,8 @@ public class TestHTableTracing extends TestTracingBase {
|
|||
hasName(expectedName),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
buildBaseAttributesMatcher(tableName),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(tableName),
|
||||
matcher));
|
||||
}
|
||||
|
||||
|
|
|
@ -17,9 +17,23 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
|
@ -51,33 +65,65 @@ public class TestRegionLocatorTracing extends TestTracingBase {
|
|||
Closeables.close(conn, true);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGetRegionLocation() throws IOException {
|
||||
conn.getRegionLocator(TableName.META_TABLE_NAME).getRegionLocation(HConstants.EMPTY_START_ROW);
|
||||
assertTrace(HRegionLocator.class.getSimpleName(), "getRegionLocation",
|
||||
null, TableName.META_TABLE_NAME);
|
||||
SpanData span = waitSpan("HRegionLocator.getRegionLocation");
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(TableName.META_TABLE_NAME),
|
||||
hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.regions",
|
||||
META_REGION_LOCATION.getDefaultRegionLocation().getRegion().getRegionNameAsString()))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRegionLocations() throws IOException {
|
||||
conn.getRegionLocator(TableName.META_TABLE_NAME).getRegionLocations(HConstants.EMPTY_START_ROW);
|
||||
assertTrace(HRegionLocator.class.getSimpleName(), "getRegionLocations",
|
||||
null, TableName.META_TABLE_NAME);
|
||||
SpanData span = waitSpan("HRegionLocator.getRegionLocations");
|
||||
// TODO: Use a value of `META_REGION_LOCATION` that contains multiple region locations.
|
||||
String[] expectedRegions = Arrays.stream(META_REGION_LOCATION.getRegionLocations())
|
||||
.map(HRegionLocation::getRegion)
|
||||
.map(RegionInfo::getRegionNameAsString)
|
||||
.toArray(String[]::new);
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(TableName.META_TABLE_NAME),
|
||||
hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions)))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllRegionLocations() throws IOException {
|
||||
conn.getRegionLocator(TableName.META_TABLE_NAME).getAllRegionLocations();
|
||||
assertTrace(HRegionLocator.class.getSimpleName(), "getAllRegionLocations",
|
||||
null, TableName.META_TABLE_NAME);
|
||||
SpanData span = waitSpan("HRegionLocator.getAllRegionLocations");
|
||||
// TODO: Use a value of `META_REGION_LOCATION` that contains multiple region locations.
|
||||
String[] expectedRegions = Arrays.stream(META_REGION_LOCATION.getRegionLocations())
|
||||
.map(HRegionLocation::getRegion)
|
||||
.map(RegionInfo::getRegionNameAsString)
|
||||
.toArray(String[]::new);
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(TableName.META_TABLE_NAME),
|
||||
hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions)))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearRegionLocationCache() throws IOException {
|
||||
conn.getRegionLocator(TableName.META_TABLE_NAME).clearRegionLocationCache();
|
||||
assertTrace(HRegionLocator.class.getSimpleName(), "clearRegionLocationCache",
|
||||
null, TableName.META_TABLE_NAME);
|
||||
SpanData span = waitSpan("HRegionLocator.clearRegionLocationCache");
|
||||
assertThat(span, allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
buildConnectionAttributesMatcher(conn),
|
||||
buildTableAttributesMatcher(TableName.META_TABLE_NAME)));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
|
@ -30,15 +34,20 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MatcherPredicate;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TestTracingBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestTracingBase.class);
|
||||
|
||||
protected static final ServerName MASTER_HOST = ServerName.valueOf("localhost", 16010, 12345);
|
||||
protected static final RegionLocations META_REGION_LOCATION =
|
||||
|
@ -86,6 +95,28 @@ public class TestTracingBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected SpanData waitSpan(String name) {
|
||||
return waitSpan(hasName(name));
|
||||
}
|
||||
|
||||
protected SpanData waitSpan(Matcher<SpanData> matcher) {
|
||||
Matcher<SpanData> spanLocator = allOf(matcher, hasEnded());
|
||||
try {
|
||||
Waiter.waitFor(conf, 1000, new MatcherPredicate<>(
|
||||
"waiting for span",
|
||||
() -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
|
||||
} catch (AssertionError e) {
|
||||
LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}",
|
||||
TRACE_RULE.getSpans());
|
||||
throw e;
|
||||
}
|
||||
return TRACE_RULE.getSpans()
|
||||
.stream()
|
||||
.filter(spanLocator::matches)
|
||||
.findFirst()
|
||||
.orElseThrow(AssertionError::new);
|
||||
}
|
||||
|
||||
static class RegistryForTracingTest implements ConnectionRegistry {
|
||||
|
||||
public RegistryForTracingTest(Configuration conf) {
|
||||
|
@ -106,6 +137,10 @@ public class TestTracingBase {
|
|||
return CompletableFuture.completedFuture(MASTER_HOST);
|
||||
}
|
||||
|
||||
@Override public String getConnectionString() {
|
||||
return "nothing";
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.hasProperty;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import java.util.Arrays;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.TypeSafeMatcher;
|
||||
|
@ -48,6 +49,17 @@ public final class AttributesMatchers {
|
|||
return containsEntry(AttributeKey.stringKey(key), value);
|
||||
}
|
||||
|
||||
public static Matcher<Attributes> containsEntryWithStringValuesOf(String key, String... values) {
|
||||
return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values));
|
||||
}
|
||||
|
||||
public static Matcher<Attributes> containsEntryWithStringValuesOf(
|
||||
String key,
|
||||
Matcher<Iterable<? extends String>> matcher
|
||||
) {
|
||||
return new IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher);
|
||||
}
|
||||
|
||||
private static final class IsAttributesContaining<T> extends TypeSafeMatcher<Attributes> {
|
||||
private final Matcher<AttributeKey<? super T>> keyMatcher;
|
||||
private final Matcher<? super T> valueMatcher;
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client.trace.hamcrest;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
||||
import org.apache.hadoop.hbase.client.ConnectionImplementation;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
public final class TraceTestUtil {
|
||||
|
||||
private TraceTestUtil() { }
|
||||
|
||||
/**
|
||||
* All {@link Span}s involving {@code conn} should include these attributes.
|
||||
*/
|
||||
public static Matcher<SpanData> buildConnectionAttributesMatcher(AsyncConnectionImpl conn) {
|
||||
return hasAttributes(allOf(
|
||||
containsEntry("db.system", "hbase"),
|
||||
containsEntry("db.connection_string", "nothing"),
|
||||
containsEntry("db.user", conn.getUser().toString())));
|
||||
}
|
||||
|
||||
/**
|
||||
* All {@link Span}s involving {@code conn} should include these attributes.
|
||||
* @see #buildConnectionAttributesMatcher(AsyncConnectionImpl)
|
||||
*/
|
||||
public static Matcher<SpanData> buildConnectionAttributesMatcher(ConnectionImplementation conn) {
|
||||
return hasAttributes(allOf(
|
||||
containsEntry("db.system", "hbase"),
|
||||
containsEntry("db.connection_string", "nothing"),
|
||||
containsEntry("db.user", conn.getUser().toString())));
|
||||
}
|
||||
|
||||
/**
|
||||
* All {@link Span}s involving {@code tableName} should include these attributes.
|
||||
*/
|
||||
public static Matcher<SpanData> buildTableAttributesMatcher(TableName tableName) {
|
||||
return hasAttributes(allOf(
|
||||
containsEntry("db.name", tableName.getNamespaceAsString()),
|
||||
containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()),
|
||||
containsEntry("db.hbase.table", tableName.getNameAsString())));
|
||||
}
|
||||
}
|
|
@ -28,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class HBaseSemanticAttributes {
|
||||
public static final AttributeKey<String> DB_SYSTEM = SemanticAttributes.DB_SYSTEM;
|
||||
public static final String DB_SYSTEM_VALUE = SemanticAttributes.DbSystemValues.HBASE;
|
||||
public static final AttributeKey<String> DB_CONNECTION_STRING =
|
||||
SemanticAttributes.DB_CONNECTION_STRING;
|
||||
public static final AttributeKey<String> DB_USER = SemanticAttributes.DB_USER;
|
||||
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
|
||||
public static final AttributeKey<String> NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE;
|
||||
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
|
@ -30,7 +28,6 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Version;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -52,15 +49,6 @@ public final class TraceUtil {
|
|||
return createSpan(name, SpanKind.INTERNAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link SpanKind#INTERNAL} span and set table related attributes.
|
||||
*/
|
||||
public static Span createTableSpan(String spanName, TableName tableName) {
|
||||
return createSpan(spanName)
|
||||
.setAttribute(NAMESPACE_KEY, tableName.getNamespaceAsString())
|
||||
.setAttribute(TABLE_KEY, tableName.getNameAsString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one
|
||||
* {@link SpanKind#CLIENT} span and one {@link SpanKind#SERVER} span for a traced request, so use
|
||||
|
|
|
@ -74,6 +74,11 @@ public class RegionServerRegistry implements ConnectionRegistry {
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectionString() {
|
||||
return "short-circuit";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// nothing
|
||||
|
|
Loading…
Reference in New Issue