HBASE-26472 Adhere to semantic conventions regarding table data operations

Follows the guidance outlined in https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e2/specification/trace/semantic_conventions/database.dm

* all table data operations are assumed to be of type CLIENT
* populate `db.name` and `db.operation` attributes
* name table data operation spans as `db.operation` `db.name`:`db.hbase.table`
  note: this implementation deviates from the recommended `db.name`.`db.sql.table` and instead
  uses HBase's native String representation of namespace:tablename.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
This commit is contained in:
Nick Dimiduk 2021-12-14 15:23:16 -08:00 committed by GitHub
parent a36d41af73
commit 8f5a12f794
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 601 additions and 78 deletions

View File

@ -193,6 +193,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -27,6 +27,7 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -44,9 +46,11 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -218,37 +222,49 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.replicaId(replicaId).call();
}
private TableOperationSpanBuilder newTableOperationSpanBuilder() {
return new TableOperationSpanBuilder().setTableName(tableName);
}
@Override
public CompletableFuture<Result> get(Get get) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(get);
return tracedFuture(
() -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
"AsyncTable.get", tableName);
supplier);
}
@Override
public CompletableFuture<Void> put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(put);
return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call(), "AsyncTable.put", tableName);
.call(), supplier);
}
@Override
public CompletableFuture<Void> delete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(delete);
return tracedFuture(
() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call(),
"AsyncTable.delete", tableName);
supplier);
}
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(append);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
@ -257,12 +273,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
controller, loc, stub, append, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
}, "AsyncTable.append", tableName);
}, supplier);
}
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(increment);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
@ -271,7 +289,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
}, "AsyncTable.increment", tableName);
}, supplier);
}
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@ -329,6 +347,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
@ -336,12 +356,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateBuilder.thenPut", tableName);
supplier);
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
@ -349,23 +371,25 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateBuilder.thenDelete", tableName);
supplier);
}
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
preCheck();
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutation,
mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call(),
"AsyncTable.CheckAndMutateBuilder.thenMutate", tableName);
supplier);
}
}
@ -397,6 +421,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
@ -405,11 +431,13 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName);
supplier);
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
@ -417,22 +445,24 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName);
supplier);
}
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutation,
mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName);
supplier);
}
}
@ -443,6 +473,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutate);
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
@ -488,16 +520,18 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
return future;
}
}, "AsyncTable.checkAndMutate", tableName);
}, supplier);
}
@Override
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
"AsyncTable.checkAndMutateList", tableName);
supplier);
}
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
@ -548,6 +582,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
@ -555,7 +591,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
resp -> resp))
.call(),
"AsyncTable.mutateRow", tableName);
supplier);
}
private Scan setDefaultScanConfig(Scan scan) {
@ -591,6 +627,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(scan);
return tracedFuture(() -> {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
@ -612,27 +650,35 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
});
return future;
}, "AsyncTable.scanAll", tableName);
}, supplier);
}
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(gets);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(puts);
return tracedFutures(() -> voidMutate(puts), supplier);
}
@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(deletes);
return tracedFutures(() -> voidMutate(deletes), supplier);
}
@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.trace;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Construct {@link io.opentelemetry.api.trace.Span} instances originating from
* "table operations" -- the verbs in our public API that interact with data in tables.
*/
@InterfaceAudience.Private
public class TableOperationSpanBuilder implements Supplier<Span> {
// n.b. The results of this class are tested implicitly by way of the likes of
// `TestAsyncTableTracing` and friends.
private static final String unknown = "UNKNOWN";
private TableName tableName;
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
@Override public Span get() {
return build();
}
public TableOperationSpanBuilder setOperation(final Scan scan) {
return setOperation(valueFrom(scan));
}
public TableOperationSpanBuilder setOperation(final Row row) {
return setOperation(valueFrom(row));
}
@SuppressWarnings("unused")
public TableOperationSpanBuilder setOperation(final Collection<? extends Row> operations) {
return setOperation(Operation.BATCH);
}
public TableOperationSpanBuilder setOperation(final Operation operation) {
attributes.put(DB_OPERATION, operation.name());
return this;
}
public TableOperationSpanBuilder setTableName(final TableName tableName) {
this.tableName = tableName;
attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
attributes.put(DB_NAME, tableName.getNamespaceAsString());
attributes.put(TABLE_KEY, tableName.getNameAsString());
return this;
}
@SuppressWarnings("unchecked")
public Span build() {
final String name = attributes.getOrDefault(DB_OPERATION, unknown)
+ " "
+ (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown);
final SpanBuilder builder = TraceUtil.getGlobalTracer()
.spanBuilder(name)
// TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
.setSpanKind(SpanKind.CLIENT);
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
return builder.startSpan();
}
private static Operation valueFrom(final Scan scan) {
if (scan == null) {
return null;
}
return Operation.SCAN;
}
private static Operation valueFrom(final Row row) {
if (row == null) {
return null;
}
if (row instanceof Append) {
return Operation.APPEND;
}
if (row instanceof CheckAndMutate) {
return Operation.CHECK_AND_MUTATE;
}
if (row instanceof Delete) {
return Operation.DELETE;
}
if (row instanceof Get) {
return Operation.GET;
}
if (row instanceof Increment) {
return Operation.INCREMENT;
}
if (row instanceof Put) {
return Operation.PUT;
}
if (row instanceof RegionCoprocessorServiceExec) {
return Operation.COPROC_EXEC;
}
if (row instanceof RowMutations) {
return Operation.BATCH;
}
return null;
}
}

View File

@ -17,15 +17,24 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@ -43,14 +52,18 @@ import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsAnything;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -59,10 +72,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@ -135,11 +146,17 @@ public class TestAsyncTableTracing {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ClientProtos.MultiResponse resp =
ClientProtos.MultiResponse.newBuilder()
.addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
.build();
ClientProtos.MultiRequest req = invocation.getArgument(1);
ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
for (ClientProtos.Action ignored : regionAction.getActionList()) {
raBuilder.addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
}
builder.addRegionActionResult(raBuilder);
}
ClientProtos.MultiResponse resp = builder.build();
RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
ForkJoinPool.commonPool().execute(() -> done.run(resp));
return null;
@ -219,49 +236,73 @@ public class TestAsyncTableTracing {
Closeables.close(conn, true);
}
private void assertTrace(String methodName) {
Waiter.waitFor(CONF, 1000,
() -> traceRule.getSpans().stream()
.anyMatch(span -> span.getName().equals("AsyncTable." + methodName) &&
span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
SpanData data = traceRule.getSpans().stream()
.filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get();
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
TableName tableName = table.getName();
assertEquals(tableName.getNamespaceAsString(), data.getAttributes().get(NAMESPACE_KEY));
assertEquals(tableName.getNameAsString(), data.getAttributes().get(TABLE_KEY));
/**
* All {@link Span}s generated from table data access operations over {@code tableName} should
* include these attributes.
*/
private static Matcher<SpanData> buildBaseAttributesMatcher(TableName tableName) {
return hasAttributes(allOf(
containsEntry("db.name", tableName.getNamespaceAsString()),
containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()),
containsEntry("db.hbase.table", tableName.getNameAsString())));
}
private void assertTrace(String tableOperation) {
assertTrace(tableOperation, new IsAnything<>());
}
private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
final TableName tableName = table.getName();
final Matcher<SpanData> spanLocator = allOf(
hasName(containsString(tableOperation)), hasEnded());
final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
"waiting for span to emit",
() -> traceRule.getSpans(), hasItem(spanLocator)));
SpanData data = traceRule.getSpans()
.stream()
.filter(spanLocator::matches)
.findFirst()
.orElseThrow(AssertionError::new);
assertThat(data, allOf(
hasName(expectedName),
hasKind(SpanKind.CLIENT),
hasStatusWithCode(StatusCode.OK),
buildBaseAttributesMatcher(tableName),
matcher));
}
@Test
public void testExists() {
table.exists(new Get(Bytes.toBytes(0))).join();
assertTrace("get");
assertTrace("GET");
}
@Test
public void testGet() {
table.get(new Get(Bytes.toBytes(0))).join();
assertTrace("get");
assertTrace("GET");
}
@Test
public void testPut() {
table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v"))).join();
assertTrace("put");
assertTrace("PUT");
}
@Test
public void testDelete() {
table.delete(new Delete(Bytes.toBytes(0))).join();
assertTrace("delete");
assertTrace("DELETE");
}
@Test
public void testAppend() {
table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v"))).join();
assertTrace("append");
assertTrace("APPEND");
}
@Test
@ -270,21 +311,21 @@ public class TestAsyncTableTracing {
.increment(
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
.join();
assertTrace("increment");
assertTrace("INCREMENT");
}
@Test
public void testIncrementColumnValue1() {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
.join();
assertTrace("increment");
assertTrace("INCREMENT");
}
@Test
public void testIncrementColumnValue2() {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
Durability.ASYNC_WAL).join();
assertTrace("increment");
assertTrace("INCREMENT");
}
@Test
@ -292,7 +333,7 @@ public class TestAsyncTableTracing {
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0)))).join();
assertTrace("checkAndMutate");
assertTrace("CHECK_AND_MUTATE");
}
@Test
@ -302,7 +343,7 @@ public class TestAsyncTableTracing {
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
assertTrace("checkAndMutateList");
assertTrace("BATCH");
}
@Test
@ -310,19 +351,100 @@ public class TestAsyncTableTracing {
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
assertTrace("checkAndMutateList");
assertTrace("BATCH");
}
private void testCheckAndMutateBuilder(Row op) {
AsyncTable.CheckAndMutateBuilder builder =
table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
.qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v"));
if (op instanceof Put) {
Put put = (Put) op;
builder.thenPut(put).join();
} else if (op instanceof Delete) {
Delete delete = (Delete) op;
builder.thenDelete(delete).join();
} else if (op instanceof RowMutations) {
RowMutations mutations = (RowMutations) op;
builder.thenMutate(mutations).join();
} else {
fail("unsupported CheckAndPut operation " + op);
}
assertTrace("CHECK_AND_MUTATE");
}
@Test
public void testCheckAndMutateBuilderThenPut() {
Put put = new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
testCheckAndMutateBuilder(put);
}
@Test
public void testCheckAndMutateBuilderThenDelete() {
testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
}
@Test
public void testCheckAndMutateBuilderThenMutations() throws IOException {
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add(new Delete(Bytes.toBytes(0)));
testCheckAndMutateBuilder(mutations);
}
private void testCheckAndMutateWithFilterBuilder(Row op) {
// use of `PrefixFilter` is completely arbitrary here.
AsyncTable.CheckAndMutateWithFilterBuilder builder =
table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
if (op instanceof Put) {
Put put = (Put) op;
builder.thenPut(put).join();
} else if (op instanceof Delete) {
Delete delete = (Delete) op;
builder.thenDelete(delete).join();
} else if (op instanceof RowMutations) {
RowMutations mutations = (RowMutations) op;
builder.thenMutate(mutations).join();
} else {
fail("unsupported CheckAndPut operation " + op);
}
assertTrace("CHECK_AND_MUTATE");
}
@Test
public void testCheckAndMutateWithFilterBuilderThenPut() {
Put put = new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
testCheckAndMutateWithFilterBuilder(put);
}
@Test
public void testCheckAndMutateWithFilterBuilderThenDelete() {
testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
}
@Test
public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add(new Delete(Bytes.toBytes(0)));
testCheckAndMutateWithFilterBuilder(mutations);
}
@Test
public void testMutateRow() throws IOException {
table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
assertTrace("mutateRow");
assertTrace("BATCH");
}
@Test
public void testScanAll() throws IOException {
public void testScanAll() {
table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
assertTrace("scanAll");
assertTrace("SCAN");
}
@Test
@ -331,13 +453,13 @@ public class TestAsyncTableTracing {
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("getList");
assertTrace("BATCH");
}
@Test
public void testExistsAll() {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("getList");
assertTrace("BATCH");
}
@Test
@ -345,13 +467,13 @@ public class TestAsyncTableTracing {
CompletableFuture
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("getList");
assertTrace("BATCH");
}
@Test
public void testGetAll() {
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("getList");
assertTrace("BATCH");
}
@Test
@ -360,14 +482,14 @@ public class TestAsyncTableTracing {
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
assertTrace("putList");
assertTrace("BATCH");
}
@Test
public void testPutAll() {
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
assertTrace("putList");
assertTrace("BATCH");
}
@Test
@ -376,13 +498,13 @@ public class TestAsyncTableTracing {
.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("deleteList");
assertTrace("BATCH");
}
@Test
public void testDeleteAll() {
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("deleteList");
assertTrace("BATCH");
}
@Test
@ -391,13 +513,13 @@ public class TestAsyncTableTracing {
.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("batch");
assertTrace("BATCH");
}
@Test
public void testBatchAll() {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("batch");
assertTrace("BATCH");
}
@Test

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.trace.hamcrest;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
/**
* Helper methods for matching against instances of {@link io.opentelemetry.api.common.Attributes}.
*/
public final class AttributesMatchers {
private AttributesMatchers() { }
public static <T> Matcher<Attributes> containsEntry(
Matcher<AttributeKey<? super T>> keyMatcher,
Matcher<? super T> valueMatcher
) {
return new IsAttributesContaining<>(keyMatcher, valueMatcher);
}
public static <T> Matcher<Attributes> containsEntry(AttributeKey<T> key, T value) {
return containsEntry(equalTo(key), equalTo(value));
}
public static Matcher<Attributes> containsEntry(String key, String value) {
return containsEntry(AttributeKey.stringKey(key), value);
}
private static final class IsAttributesContaining<T> extends TypeSafeMatcher<Attributes> {
private final Matcher<AttributeKey<? super T>> keyMatcher;
private final Matcher<? super T> valueMatcher;
private IsAttributesContaining(
final Matcher<AttributeKey<? super T>> keyMatcher,
final Matcher<? super T> valueMatcher
) {
this.keyMatcher = keyMatcher;
this.valueMatcher = valueMatcher;
}
@Override
protected boolean matchesSafely(Attributes item) {
return item.asMap().entrySet().stream().anyMatch(e -> allOf(
hasProperty("key", keyMatcher),
hasProperty("value", valueMatcher))
.matches(e));
}
@Override
public void describeMismatchSafely(Attributes item, Description mismatchDescription) {
mismatchDescription
.appendText("Attributes was ")
.appendValueList("[", ", ", "]", item.asMap().entrySet());
}
@Override
public void describeTo(Description description) {
description
.appendText("Attributes containing [")
.appendDescriptionOf(keyMatcher)
.appendText("->")
.appendDescriptionOf(valueMatcher)
.appendText("]");
}
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.trace.hamcrest;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
/**
* Helper methods for matching against instances of {@link SpanData}.
*/
public final class SpanDataMatchers {
private SpanDataMatchers() { }
public static Matcher<SpanData> hasAttributes(Matcher<Attributes> matcher) {
return new FeatureMatcher<SpanData, Attributes>(
matcher, "SpanData having attributes that ", "attributes"
) {
@Override protected Attributes featureValueOf(SpanData item) {
return item.getAttributes();
}
};
}
public static Matcher<SpanData> hasEnded() {
return new TypeSafeMatcher<SpanData>() {
@Override protected boolean matchesSafely(SpanData item) {
return item.hasEnded();
}
@Override public void describeTo(Description description) {
description.appendText("SpanData that hasEnded");
}
};
}
public static Matcher<SpanData> hasKind(SpanKind kind) {
return new FeatureMatcher<SpanData, SpanKind>(
equalTo(kind), "SpanData with kind that", "SpanKind") {
@Override protected SpanKind featureValueOf(SpanData item) {
return item.getKind();
}
};
}
public static Matcher<SpanData> hasName(String name) {
return hasName(equalTo(name));
}
public static Matcher<SpanData> hasName(Matcher<String> matcher) {
return new FeatureMatcher<SpanData, String>(matcher, "SpanKind with a name that", "name") {
@Override protected String featureValueOf(SpanData item) {
return item.getName();
}
};
}
public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
final Matcher<StatusCode> matcher = is(equalTo(statusCode));
return new TypeSafeMatcher<SpanData>() {
@Override protected boolean matchesSafely(SpanData item) {
final StatusData statusData = item.getStatus();
return statusData != null
&& statusData.getStatusCode() != null
&& matcher.matches(statusData.getStatusCode());
}
@Override public void describeTo(Description description) {
description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
}
};
}
}

View File

@ -28,7 +28,9 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public final class HBaseSemanticAttributes {
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
public static final AttributeKey<String> NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE;
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
AttributeKey.stringArrayKey("db.hbase.regions");
@ -44,5 +46,23 @@ public final class HBaseSemanticAttributes {
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
/**
* These are values used with {@link #DB_OPERATION}. They correspond with the implementations of
* {@code org.apache.hadoop.hbase.client.Operation}, as well as
* {@code org.apache.hadoop.hbase.client.CheckAndMutate}, and "MULTI", meaning a batch of multiple
* operations.
*/
public enum Operation {
APPEND,
BATCH,
CHECK_AND_MUTATE,
COPROC_EXEC,
DELETE,
GET,
INCREMENT,
PUT,
SCAN,
}
private HBaseSemanticAttributes() { }
}

View File

@ -91,9 +91,11 @@ public final class TraceUtil {
/**
* Trace an asynchronous operation for a table.
*/
public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action,
String spanName, TableName tableName) {
Span span = createTableSpan(spanName, tableName);
public static <T> CompletableFuture<T> tracedFuture(
Supplier<CompletableFuture<T>> action,
Supplier<Span> spanSupplier
) {
Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
endSpan(future, span);
@ -119,8 +121,10 @@ public final class TraceUtil {
* {@code futures} are completed.
*/
public static <T> List<CompletableFuture<T>> tracedFutures(
Supplier<List<CompletableFuture<T>>> action, String spanName, TableName tableName) {
Span span = createTableSpan(spanName, tableName);
Supplier<List<CompletableFuture<T>>> action,
Supplier<Span> spanSupplier
) {
Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
List<CompletableFuture<T>> futures = action.get();
endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);