HBASE-26473 Introduce `db.hbase.container_operations` span attribute
For batch operations, collect and annotate the associated span with the set of all operations contained in the batch. Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
dffeb8e63e
commit
5e01534494
|
@ -30,6 +30,7 @@ import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.Span;
|
import io.opentelemetry.api.trace.Span;
|
||||||
import io.opentelemetry.api.trace.SpanKind;
|
import io.opentelemetry.api.trace.SpanKind;
|
||||||
|
import io.opentelemetry.api.trace.StatusCode;
|
||||||
import io.opentelemetry.context.Scope;
|
import io.opentelemetry.context.Scope;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
@ -406,7 +407,8 @@ public class HTable implements Table {
|
||||||
public Result[] get(List<Get> gets) throws IOException {
|
public Result[] get(List<Get> gets) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(gets);
|
||||||
return TraceUtil.trace(() -> {
|
return TraceUtil.trace(() -> {
|
||||||
if (gets.size() == 1) {
|
if (gets.size() == 1) {
|
||||||
return new Result[] { get(gets.get(0)) };
|
return new Result[] { get(gets.get(0)) };
|
||||||
|
@ -433,7 +435,8 @@ public class HTable implements Table {
|
||||||
throws InterruptedException, IOException {
|
throws InterruptedException, IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(actions);
|
||||||
TraceUtil.traceWithIOException(() -> {
|
TraceUtil.traceWithIOException(() -> {
|
||||||
int rpcTimeout = writeRpcTimeoutMs;
|
int rpcTimeout = writeRpcTimeoutMs;
|
||||||
boolean hasRead = false;
|
boolean hasRead = false;
|
||||||
|
@ -473,6 +476,7 @@ public class HTable implements Table {
|
||||||
final Span span = new TableOperationSpanBuilder(connection)
|
final Span span = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(actions)
|
||||||
.build();
|
.build();
|
||||||
try (Scope ignored = span.makeCurrent()) {
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
AsyncRequestFuture ars = multiAp.submit(task);
|
AsyncRequestFuture ars = multiAp.submit(task);
|
||||||
|
@ -481,6 +485,7 @@ public class HTable implements Table {
|
||||||
TraceUtil.setError(span, ars.getErrors());
|
TraceUtil.setError(span, ars.getErrors());
|
||||||
throw ars.getErrors();
|
throw ars.getErrors();
|
||||||
}
|
}
|
||||||
|
span.setStatus(StatusCode.OK);
|
||||||
} finally {
|
} finally {
|
||||||
span.end();
|
span.end();
|
||||||
}
|
}
|
||||||
|
@ -512,6 +517,7 @@ public class HTable implements Table {
|
||||||
final Span span = new TableOperationSpanBuilder(connection)
|
final Span span = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(actions)
|
||||||
.build();
|
.build();
|
||||||
try (Scope ignored = span.makeCurrent()) {
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
|
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
|
||||||
|
@ -551,7 +557,8 @@ public class HTable implements Table {
|
||||||
public void delete(final List<Delete> deletes) throws IOException {
|
public void delete(final List<Delete> deletes) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(deletes);
|
||||||
TraceUtil.traceWithIOException(() -> {
|
TraceUtil.traceWithIOException(() -> {
|
||||||
Object[] results = new Object[deletes.size()];
|
Object[] results = new Object[deletes.size()];
|
||||||
try {
|
try {
|
||||||
|
@ -600,7 +607,8 @@ public class HTable implements Table {
|
||||||
public void put(final List<Put> puts) throws IOException {
|
public void put(final List<Put> puts) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(puts);
|
||||||
TraceUtil.traceWithIOException(() -> {
|
TraceUtil.traceWithIOException(() -> {
|
||||||
for (Put put : puts) {
|
for (Put put : puts) {
|
||||||
validatePut(put);
|
validatePut(put);
|
||||||
|
@ -618,7 +626,8 @@ public class HTable implements Table {
|
||||||
public Result mutateRow(final RowMutations rm) throws IOException {
|
public Result mutateRow(final RowMutations rm) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(rm);
|
||||||
return TraceUtil.trace(() -> {
|
return TraceUtil.trace(() -> {
|
||||||
long nonceGroup = getNonceGroup();
|
long nonceGroup = getNonceGroup();
|
||||||
long nonce = getNonce();
|
long nonce = getNonce();
|
||||||
|
@ -773,7 +782,8 @@ public class HTable implements Table {
|
||||||
final byte [] value, final Put put) throws IOException {
|
final byte [] value, final Put put) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
|
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
|
||||||
.isSuccess(),
|
.isSuccess(),
|
||||||
|
@ -786,7 +796,8 @@ public class HTable implements Table {
|
||||||
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
|
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||||
null, put).isSuccess(),
|
null, put).isSuccess(),
|
||||||
|
@ -799,7 +810,8 @@ public class HTable implements Table {
|
||||||
final CompareOperator op, final byte [] value, final Put put) throws IOException {
|
final CompareOperator op, final byte [] value, final Put put) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
|
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
|
||||||
supplier);
|
supplier);
|
||||||
|
@ -811,7 +823,8 @@ public class HTable implements Table {
|
||||||
final byte[] value, final Delete delete) throws IOException {
|
final byte[] value, final Delete delete) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
|
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
|
||||||
delete).isSuccess(),
|
delete).isSuccess(),
|
||||||
|
@ -824,7 +837,8 @@ public class HTable implements Table {
|
||||||
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
|
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||||
null, delete).isSuccess(),
|
null, delete).isSuccess(),
|
||||||
|
@ -837,7 +851,8 @@ public class HTable implements Table {
|
||||||
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
|
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
|
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
|
||||||
supplier);
|
supplier);
|
||||||
|
@ -914,7 +929,8 @@ public class HTable implements Table {
|
||||||
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
|
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(rm);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||||
null, rm).isSuccess(),
|
null, rm).isSuccess(),
|
||||||
|
@ -927,7 +943,8 @@ public class HTable implements Table {
|
||||||
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
|
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(rm);
|
||||||
return TraceUtil.trace(
|
return TraceUtil.trace(
|
||||||
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
|
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
|
||||||
supplier);
|
supplier);
|
||||||
|
@ -937,7 +954,8 @@ public class HTable implements Table {
|
||||||
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
|
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(checkAndMutate);
|
.setOperation(checkAndMutate)
|
||||||
|
.setContainerOperations(checkAndMutate);
|
||||||
return TraceUtil.trace(() -> {
|
return TraceUtil.trace(() -> {
|
||||||
Row action = checkAndMutate.getAction();
|
Row action = checkAndMutate.getAction();
|
||||||
if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
|
if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
|
||||||
|
@ -986,7 +1004,8 @@ public class HTable implements Table {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(checkAndMutates);
|
||||||
return TraceUtil.trace(() -> {
|
return TraceUtil.trace(() -> {
|
||||||
if (checkAndMutates.isEmpty()) {
|
if (checkAndMutates.isEmpty()) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
|
@ -1056,7 +1075,8 @@ public class HTable implements Table {
|
||||||
public boolean[] exists(List<Get> gets) throws IOException {
|
public boolean[] exists(List<Get> gets) throws IOException {
|
||||||
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
|
||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||||
|
.setContainerOperations(gets);
|
||||||
return TraceUtil.trace(() -> {
|
return TraceUtil.trace(() -> {
|
||||||
if (gets.isEmpty()) {
|
if (gets.isEmpty()) {
|
||||||
return new boolean[] {};
|
return new boolean[] {};
|
||||||
|
|
|
@ -386,7 +386,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||||
preCheck();
|
preCheck();
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(put);
|
||||||
return tracedFuture(
|
return tracedFuture(
|
||||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
|
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
|
||||||
|
@ -401,7 +402,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||||
preCheck();
|
preCheck();
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(delete);
|
||||||
return tracedFuture(
|
return tracedFuture(
|
||||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
||||||
|
@ -417,7 +419,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
preCheck();
|
preCheck();
|
||||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(mutations);
|
||||||
return tracedFuture(
|
return tracedFuture(
|
||||||
() -> RawAsyncTableImpl.this
|
() -> RawAsyncTableImpl.this
|
||||||
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
||||||
|
@ -460,7 +463,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
public CompletableFuture<Boolean> thenPut(Put put) {
|
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(put);
|
||||||
return tracedFuture(
|
return tracedFuture(
|
||||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
||||||
|
@ -475,7 +479,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(delete);
|
||||||
return tracedFuture(
|
return tracedFuture(
|
||||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
||||||
|
@ -490,7 +495,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
|
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
|
||||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||||
|
.setContainerOperations(mutations);
|
||||||
return tracedFuture(
|
return tracedFuture(
|
||||||
() -> RawAsyncTableImpl.this
|
() -> RawAsyncTableImpl.this
|
||||||
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
||||||
|
@ -512,7 +518,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
|
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(checkAndMutate);
|
.setOperation(checkAndMutate)
|
||||||
|
.setContainerOperations(checkAndMutate.getAction());
|
||||||
return tracedFuture(() -> {
|
return tracedFuture(() -> {
|
||||||
if (checkAndMutate.getAction() instanceof Put ||
|
if (checkAndMutate.getAction() instanceof Put ||
|
||||||
checkAndMutate.getAction() instanceof Delete ||
|
checkAndMutate.getAction() instanceof Delete ||
|
||||||
|
@ -565,7 +572,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
public List<CompletableFuture<CheckAndMutateResult>>
|
public List<CompletableFuture<CheckAndMutateResult>>
|
||||||
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
|
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(checkAndMutates);
|
.setOperation(checkAndMutates)
|
||||||
|
.setContainerOperations(checkAndMutates);
|
||||||
return tracedFutures(
|
return tracedFutures(
|
||||||
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
|
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
|
||||||
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
|
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
|
||||||
|
@ -621,7 +629,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||||
long nonce = conn.getNonceGenerator().newNonce();
|
long nonce = conn.getNonceGenerator().newNonce();
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(mutations);
|
.setOperation(mutations)
|
||||||
|
.setContainerOperations(mutations);
|
||||||
return tracedFuture(
|
return tracedFuture(
|
||||||
() -> this
|
() -> this
|
||||||
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
|
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
|
||||||
|
@ -694,28 +703,32 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
@Override
|
@Override
|
||||||
public List<CompletableFuture<Result>> get(List<Get> gets) {
|
public List<CompletableFuture<Result>> get(List<Get> gets) {
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(gets);
|
.setOperation(gets)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
|
||||||
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
|
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<CompletableFuture<Void>> put(List<Put> puts) {
|
public List<CompletableFuture<Void>> put(List<Put> puts) {
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(puts);
|
.setOperation(puts)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
|
||||||
return tracedFutures(() -> voidMutate(puts), supplier);
|
return tracedFutures(() -> voidMutate(puts), supplier);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
|
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(deletes);
|
.setOperation(deletes)
|
||||||
|
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
|
||||||
return tracedFutures(() -> voidMutate(deletes), supplier);
|
return tracedFutures(() -> voidMutate(deletes), supplier);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
|
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
|
||||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||||
.setOperation(actions);
|
.setOperation(actions)
|
||||||
|
.setContainerOperations(actions);
|
||||||
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
|
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,15 +18,22 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.client.trace;
|
package org.apache.hadoop.hbase.client.trace;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY;
|
||||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
|
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
|
||||||
import io.opentelemetry.api.common.AttributeKey;
|
import io.opentelemetry.api.common.AttributeKey;
|
||||||
import io.opentelemetry.api.trace.Span;
|
import io.opentelemetry.api.trace.Span;
|
||||||
import io.opentelemetry.api.trace.SpanBuilder;
|
import io.opentelemetry.api.trace.SpanBuilder;
|
||||||
import io.opentelemetry.api.trace.SpanKind;
|
import io.opentelemetry.api.trace.SpanKind;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
||||||
|
@ -90,6 +97,76 @@ public class TableOperationSpanBuilder implements Supplier<Span> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// `setContainerOperations` perform a recursive descent expansion of all the operations
|
||||||
|
// contained within the provided "batch" object.
|
||||||
|
|
||||||
|
public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) {
|
||||||
|
final Operation[] ops = mutations.getMutations()
|
||||||
|
.stream()
|
||||||
|
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
|
||||||
|
.toArray(Operation[]::new);
|
||||||
|
return setContainerOperations(ops);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableOperationSpanBuilder setContainerOperations(final Row row) {
|
||||||
|
final Operation[] ops =
|
||||||
|
Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())
|
||||||
|
.toArray(Operation[]::new);
|
||||||
|
return setContainerOperations(ops);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableOperationSpanBuilder setContainerOperations(
|
||||||
|
final Collection<? extends Row> operations
|
||||||
|
) {
|
||||||
|
final Operation[] ops = operations.stream()
|
||||||
|
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
|
||||||
|
.toArray(Operation[]::new);
|
||||||
|
return setContainerOperations(ops);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<Operation> unpackRowOperations(final Row row) {
|
||||||
|
final Set<Operation> ops = new HashSet<>();
|
||||||
|
if (row instanceof CheckAndMutate) {
|
||||||
|
final CheckAndMutate cam = (CheckAndMutate) row;
|
||||||
|
ops.addAll(unpackRowOperations(cam));
|
||||||
|
}
|
||||||
|
if (row instanceof RowMutations) {
|
||||||
|
final RowMutations mutations = (RowMutations) row;
|
||||||
|
final List<Operation> operations = mutations.getMutations()
|
||||||
|
.stream()
|
||||||
|
.map(TableOperationSpanBuilder::valueFrom)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
ops.addAll(operations);
|
||||||
|
}
|
||||||
|
return ops;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) {
|
||||||
|
final Set<Operation> ops = new HashSet<>();
|
||||||
|
final Operation op = valueFrom(cam.getAction());
|
||||||
|
switch (op) {
|
||||||
|
case BATCH:
|
||||||
|
case CHECK_AND_MUTATE:
|
||||||
|
ops.addAll(unpackRowOperations(cam.getAction()));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ops.add(op);
|
||||||
|
}
|
||||||
|
return ops;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableOperationSpanBuilder setContainerOperations(
|
||||||
|
final Operation... operations
|
||||||
|
) {
|
||||||
|
final List<String> ops = Arrays.stream(operations)
|
||||||
|
.map(op -> op == null ? unknown : op.name())
|
||||||
|
.sorted()
|
||||||
|
.distinct()
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public TableOperationSpanBuilder setTableName(final TableName tableName) {
|
public TableOperationSpanBuilder setTableName(final TableName tableName) {
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
|
TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
|
||||||
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||||
|
@ -335,7 +337,9 @@ public class TestAsyncTableTracing {
|
||||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||||
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
|
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
|
||||||
.join();
|
.join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -343,7 +347,9 @@ public class TestAsyncTableTracing {
|
||||||
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||||
.build(new Delete(Bytes.toBytes(0))))).join();
|
.build(new Delete(Bytes.toBytes(0))))).join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testCheckAndMutateBuilder(Row op) {
|
private void testCheckAndMutateBuilder(Row op) {
|
||||||
|
@ -428,12 +434,14 @@ public class TestAsyncTableTracing {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMutateRow() throws Exception {
|
public void testMutateRow() throws IOException {
|
||||||
byte[] row = Bytes.toBytes(0);
|
final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
|
||||||
RowMutations mutation = new RowMutations(row);
|
.add((Mutation) new Put(Bytes.toBytes(0))
|
||||||
mutation.add(new Delete(row));
|
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
|
||||||
table.mutateRow(mutation).get();
|
.add((Mutation) new Delete(Bytes.toBytes(0)));
|
||||||
assertTrace("BATCH");
|
table.mutateRow(mutations).join();
|
||||||
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -448,13 +456,15 @@ public class TestAsyncTableTracing {
|
||||||
.allOf(
|
.allOf(
|
||||||
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||||
.join();
|
.join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExistsAll() {
|
public void testExistsAll() {
|
||||||
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -462,13 +472,15 @@ public class TestAsyncTableTracing {
|
||||||
CompletableFuture
|
CompletableFuture
|
||||||
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||||
.join();
|
.join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAll() {
|
public void testGetAll() {
|
||||||
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -477,14 +489,16 @@ public class TestAsyncTableTracing {
|
||||||
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||||
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
|
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
|
||||||
.join();
|
.join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutAll() {
|
public void testPutAll() {
|
||||||
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||||
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
|
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -493,13 +507,15 @@ public class TestAsyncTableTracing {
|
||||||
.allOf(
|
.allOf(
|
||||||
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||||
.join();
|
.join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteAll() {
|
public void testDeleteAll() {
|
||||||
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -508,12 +524,14 @@ public class TestAsyncTableTracing {
|
||||||
.allOf(
|
.allOf(
|
||||||
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||||
.join();
|
.join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBatchAll() {
|
public void testBatchAll() {
|
||||||
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
|
||||||
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||||
|
@ -320,7 +322,9 @@ public class TestHTableTracing extends TestTracingBase {
|
||||||
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||||
.build(new Delete(Bytes.toBytes(0))));
|
.build(new Delete(Bytes.toBytes(0))));
|
||||||
assertTrace("CHECK_AND_MUTATE");
|
assertTrace("CHECK_AND_MUTATE", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -328,7 +332,9 @@ public class TestHTableTracing extends TestTracingBase {
|
||||||
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||||
.build(new Delete(Bytes.toBytes(0)))));
|
.build(new Delete(Bytes.toBytes(0)))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -336,51 +342,67 @@ public class TestHTableTracing extends TestTracingBase {
|
||||||
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||||
.build(new Delete(Bytes.toBytes(0)))));
|
.build(new Delete(Bytes.toBytes(0)))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMutateRow() throws Exception {
|
public void testMutateRow() throws Exception {
|
||||||
byte[] row = Bytes.toBytes(0);
|
byte[] row = Bytes.toBytes(0);
|
||||||
table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row))));
|
table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExistsList() throws IOException {
|
public void testExistsList() throws IOException {
|
||||||
table.exists(Arrays.asList(new Get(Bytes.toBytes(0))));
|
table.exists(Arrays.asList(new Get(Bytes.toBytes(0))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "GET")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExistsAll() throws IOException {
|
public void testExistsAll() throws IOException {
|
||||||
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0))));
|
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "GET")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetList() throws IOException {
|
public void testGetList() throws IOException {
|
||||||
table.get(Arrays.asList(new Get(Bytes.toBytes(0))));
|
table.get(Arrays.asList(new Get(Bytes.toBytes(0))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "GET")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutList() throws IOException {
|
public void testPutList() throws IOException {
|
||||||
table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||||
Bytes.toBytes("cq"), Bytes.toBytes("v"))));
|
Bytes.toBytes("cq"), Bytes.toBytes("v"))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "PUT")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteList() throws IOException {
|
public void testDeleteList() throws IOException {
|
||||||
table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0))));
|
table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0))));
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBatchList() throws IOException, InterruptedException {
|
public void testBatchList() throws IOException, InterruptedException {
|
||||||
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null);
|
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null);
|
||||||
assertTrace("BATCH");
|
assertTrace("BATCH", hasAttributes(
|
||||||
|
containsEntryWithStringValuesOf(
|
||||||
|
"db.hbase.container_operations", "DELETE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -388,5 +410,4 @@ public class TestHTableTracing extends TestTracingBase {
|
||||||
table.close();
|
table.close();
|
||||||
assertTrace(HTable.class.getSimpleName(), "close", null, TableName.META_TABLE_NAME);
|
assertTrace(HTable.class.getSimpleName(), "close", null, TableName.META_TABLE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,7 @@ public class TestTracingBase {
|
||||||
conf = HBaseConfiguration.create();
|
conf = HBaseConfiguration.create();
|
||||||
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
|
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
|
||||||
RegistryForTracingTest.class.getName());
|
RegistryForTracingTest.class.getName());
|
||||||
|
TRACE_RULE.clearSpans();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void assertTrace(String className, String methodName, ServerName serverName,
|
protected void assertTrace(String className, String methodName, ServerName serverName,
|
||||||
|
|
|
@ -36,6 +36,12 @@ public final class HBaseSemanticAttributes {
|
||||||
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
|
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
|
||||||
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
|
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
|
||||||
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
|
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
|
||||||
|
/**
|
||||||
|
* For operations that themselves ship one or more operations, such as
|
||||||
|
* {@link Operation#BATCH} and {@link Operation#CHECK_AND_MUTATE}.
|
||||||
|
*/
|
||||||
|
public static final AttributeKey<List<String>> CONTAINER_DB_OPERATIONS_KEY =
|
||||||
|
AttributeKey.stringArrayKey("db.hbase.container_operations");
|
||||||
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
|
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
|
||||||
AttributeKey.stringArrayKey("db.hbase.regions");
|
AttributeKey.stringArrayKey("db.hbase.regions");
|
||||||
public static final AttributeKey<String> RPC_SERVICE_KEY =
|
public static final AttributeKey<String> RPC_SERVICE_KEY =
|
||||||
|
|
Loading…
Reference in New Issue