HBASE-26472 Adhere to semantic conventions regarding table data operations
Follows the guidance outlined in https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e2/specification/trace/semantic_conventions/database.dm * all table data operations are assumed to be of type CLIENT * populate `db.name` and `db.operation` attributes * name table data operation spans as `db.operation` `db.name`:`db.hbase.table` note: this implementation deviates from the recommended `db.name`.`db.sql.table` and instead uses HBase's native String representation of namespace:tablename. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
This commit is contained in:
parent
5defd8c35f
commit
c7a8e428df
@ -200,6 +200,11 @@
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-library</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-crypto</artifactId>
|
||||
|
@ -28,6 +28,8 @@ import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
@ -42,6 +44,7 @@ import java.util.concurrent.Future;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
@ -50,11 +53,13 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
@ -354,9 +359,10 @@ public class HTable implements Table {
|
||||
|
||||
@Override
|
||||
public Result get(final Get get) throws IOException {
|
||||
return TraceUtil.trace(
|
||||
() -> get(get, get.isCheckExistenceOnly()),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".get", tableName));
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(get);
|
||||
return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier);
|
||||
}
|
||||
|
||||
private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
|
||||
@ -396,6 +402,9 @@ public class HTable implements Table {
|
||||
|
||||
@Override
|
||||
public Result[] get(List<Get> gets) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
if (gets.size() == 1) {
|
||||
return new Result[] { get(gets.get(0)) };
|
||||
@ -414,12 +423,15 @@ public class HTable implements Table {
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".getList", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batch(final List<? extends Row> actions, final Object[] results)
|
||||
throws InterruptedException, IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
int rpcTimeout = writeRpcTimeoutMs;
|
||||
boolean hasRead = false;
|
||||
@ -442,7 +454,7 @@ public class HTable implements Table {
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".batch", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
|
||||
@ -456,10 +468,19 @@ public class HTable implements Table {
|
||||
.setOperationTimeout(operationTimeoutMs)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture ars = multiAp.submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
final Span span = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||
.build();
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
AsyncRequestFuture ars = multiAp.submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
TraceUtil.setError(span, ars.getErrors());
|
||||
throw ars.getErrors();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
@ -486,15 +507,27 @@ public class HTable implements Table {
|
||||
.setRpcTimeout(writeTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
final Span span = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
|
||||
.build();
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
TraceUtil.setError(span, ars.getErrors());
|
||||
throw ars.getErrors();
|
||||
}
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(delete);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
ClientServiceCallable<Void> callable =
|
||||
new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(),
|
||||
@ -509,12 +542,14 @@ public class HTable implements Table {
|
||||
};
|
||||
rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs)
|
||||
.callWithRetries(callable, this.operationTimeoutMs);
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".delete", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(final List<Delete> deletes)
|
||||
throws IOException {
|
||||
public void delete(final List<Delete> deletes) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
Object[] results = new Object[deletes.size()];
|
||||
try {
|
||||
@ -533,11 +568,14 @@ public class HTable implements Table {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".deleteList", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(put);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
validatePut(put);
|
||||
ClientServiceCallable<Void> callable =
|
||||
@ -553,11 +591,14 @@ public class HTable implements Table {
|
||||
};
|
||||
rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs)
|
||||
.callWithRetries(callable, this.operationTimeoutMs);
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".put", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final List<Put> puts) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
TraceUtil.traceWithIOException(() -> {
|
||||
for (Put put : puts) {
|
||||
validatePut(put);
|
||||
@ -568,11 +609,14 @@ public class HTable implements Table {
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".putList", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result mutateRow(final RowMutations rm) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
long nonceGroup = getNonceGroup();
|
||||
long nonce = getNonce();
|
||||
@ -613,7 +657,7 @@ public class HTable implements Table {
|
||||
throw ars.getErrors();
|
||||
}
|
||||
return (Result) results[0];
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".mutateRow", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
private long getNonceGroup() {
|
||||
@ -626,6 +670,9 @@ public class HTable implements Table {
|
||||
|
||||
@Override
|
||||
public Result append(final Append append) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(append);
|
||||
return TraceUtil.trace(() -> {
|
||||
checkHasFamilies(append);
|
||||
NoncedRegionServerCallable<Result> callable =
|
||||
@ -645,11 +692,14 @@ public class HTable implements Table {
|
||||
};
|
||||
return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).
|
||||
callWithRetries(callable, this.operationTimeoutMs);
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".append", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(final Increment increment) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(increment);
|
||||
return TraceUtil.trace(() -> {
|
||||
checkHasFamilies(increment);
|
||||
NoncedRegionServerCallable<Result> callable =
|
||||
@ -667,22 +717,23 @@ public class HTable implements Table {
|
||||
};
|
||||
return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable,
|
||||
this.operationTimeoutMs);
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final long amount)
|
||||
throws IOException {
|
||||
return TraceUtil.trace(
|
||||
() -> incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName));
|
||||
return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final long amount, final Durability durability)
|
||||
throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.INCREMENT);
|
||||
return TraceUtil.trace(() -> {
|
||||
NullPointerException npe = null;
|
||||
if (row == null) {
|
||||
@ -711,65 +762,83 @@ public class HTable implements Table {
|
||||
};
|
||||
return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).
|
||||
callWithRetries(callable, this.operationTimeoutMs);
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
|
||||
.isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||
null, put).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOperator op, final byte [] value, final Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final byte[] value, final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
|
||||
delete).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||
null, delete).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -841,25 +910,32 @@ public class HTable implements Table {
|
||||
@Deprecated
|
||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||
null, rm).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate",
|
||||
tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate",
|
||||
tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(checkAndMutate);
|
||||
return TraceUtil.trace(() -> {
|
||||
Row action = checkAndMutate.getAction();
|
||||
if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
|
||||
@ -875,8 +951,7 @@ public class HTable implements Table {
|
||||
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
|
||||
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
|
||||
}
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate",
|
||||
tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
|
||||
@ -907,6 +982,9 @@ public class HTable implements Table {
|
||||
@Override
|
||||
public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
|
||||
throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
if (checkAndMutates.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
@ -929,8 +1007,7 @@ public class HTable implements Table {
|
||||
ret.add((CheckAndMutateResult) r);
|
||||
}
|
||||
return ret;
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutateList",
|
||||
tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
private CompareOperator toCompareOperator(CompareOp compareOp) {
|
||||
@ -963,15 +1040,21 @@ public class HTable implements Table {
|
||||
|
||||
@Override
|
||||
public boolean exists(final Get get) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(get);
|
||||
return TraceUtil.trace(() -> {
|
||||
Result r = get(get, true);
|
||||
assert r.getExists() != null;
|
||||
return r.getExists();
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".get", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean[] exists(List<Get> gets) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
|
||||
return TraceUtil.trace(() -> {
|
||||
if (gets.isEmpty()) {
|
||||
return new boolean[] {};
|
||||
@ -1003,7 +1086,7 @@ public class HTable implements Table {
|
||||
}
|
||||
|
||||
return results;
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".getList", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1373,30 +1456,39 @@ public class HTable implements Table {
|
||||
|
||||
@Override
|
||||
public boolean thenPut(Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
validatePut(put);
|
||||
preCheck();
|
||||
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
|
||||
.isSuccess();
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenPut", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenDelete(Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
preCheck();
|
||||
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
|
||||
.isSuccess();
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenDelete", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenMutate(RowMutations mutation) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
preCheck();
|
||||
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation)
|
||||
.isSuccess();
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenMutate", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1419,26 +1511,35 @@ public class HTable implements Table {
|
||||
|
||||
@Override
|
||||
public boolean thenPut(Put put) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(() -> {
|
||||
validatePut(put);
|
||||
return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put)
|
||||
.isSuccess();
|
||||
}, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenPut", tableName));
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenDelete(Delete delete) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenDelete", tableName));
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenMutate(RowMutations mutation) throws IOException {
|
||||
final Supplier<Span> supplier = new TableOperationSpanBuilder()
|
||||
.setTableName(tableName)
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return TraceUtil.trace(
|
||||
() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation)
|
||||
.isSuccess(),
|
||||
() -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenMutate", tableName));
|
||||
supplier);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -37,6 +38,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
@ -44,9 +46,11 @@ import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
@ -256,37 +260,49 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
.replicaId(replicaId).call();
|
||||
}
|
||||
|
||||
private TableOperationSpanBuilder newTableOperationSpanBuilder() {
|
||||
return new TableOperationSpanBuilder().setTableName(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> get(Get get) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(get);
|
||||
return tracedFuture(
|
||||
() -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
|
||||
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
|
||||
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
|
||||
"AsyncTable.get", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> put(Put put) {
|
||||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(put);
|
||||
return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
|
||||
put, RequestConverter::buildMutateRequest))
|
||||
.call(), "AsyncTable.put", tableName);
|
||||
.call(), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> delete(Delete delete) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(delete);
|
||||
return tracedFuture(
|
||||
() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
|
||||
stub, delete, RequestConverter::buildMutateRequest))
|
||||
.call(),
|
||||
"AsyncTable.delete", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> append(Append append) {
|
||||
checkHasFamilies(append);
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(append);
|
||||
return tracedFuture(() -> {
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
@ -295,12 +311,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
controller, loc, stub, append, RequestConverter::buildMutateRequest,
|
||||
RawAsyncTableImpl::toResult))
|
||||
.call();
|
||||
}, "AsyncTable.append", tableName);
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> increment(Increment increment) {
|
||||
checkHasFamilies(increment);
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(increment);
|
||||
return tracedFuture(() -> {
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
@ -309,7 +327,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
|
||||
RawAsyncTableImpl::toResult))
|
||||
.call();
|
||||
}, "AsyncTable.increment", tableName);
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
|
||||
@ -367,6 +385,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||
preCheck();
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
|
||||
@ -374,12 +394,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call(),
|
||||
"AsyncTable.CheckAndMutateBuilder.thenPut", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
preCheck();
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
||||
@ -387,23 +409,25 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call(),
|
||||
"AsyncTable.CheckAndMutateBuilder.thenDelete", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
|
||||
preCheck();
|
||||
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
|
||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this
|
||||
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
|
||||
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
|
||||
mutation,
|
||||
mutations,
|
||||
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
CheckAndMutateResult::isSuccess))
|
||||
.call(),
|
||||
"AsyncTable.CheckAndMutateBuilder.thenMutate", tableName);
|
||||
supplier);
|
||||
}
|
||||
}
|
||||
|
||||
@ -435,6 +459,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
||||
@ -443,11 +469,13 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call(),
|
||||
"AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
||||
@ -455,22 +483,24 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call(),
|
||||
"AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
|
||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this
|
||||
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
|
||||
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
|
||||
mutation,
|
||||
mutations,
|
||||
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
|
||||
timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
CheckAndMutateResult::isSuccess))
|
||||
.call(),
|
||||
"AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName);
|
||||
supplier);
|
||||
}
|
||||
}
|
||||
|
||||
@ -481,6 +511,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(checkAndMutate);
|
||||
return tracedFuture(() -> {
|
||||
if (checkAndMutate.getAction() instanceof Put ||
|
||||
checkAndMutate.getAction() instanceof Delete ||
|
||||
@ -526,16 +558,18 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
|
||||
return future;
|
||||
}
|
||||
}, "AsyncTable.checkAndMutate", tableName);
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<CheckAndMutateResult>>
|
||||
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(checkAndMutates);
|
||||
return tracedFutures(
|
||||
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
|
||||
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
|
||||
"AsyncTable.checkAndMutateList", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
|
||||
@ -586,6 +620,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(mutations);
|
||||
return tracedFuture(
|
||||
() -> this
|
||||
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
|
||||
@ -593,7 +629,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
|
||||
resp -> resp))
|
||||
.call(),
|
||||
"AsyncTable.mutateRow", tableName);
|
||||
supplier);
|
||||
}
|
||||
|
||||
private Scan setDefaultScanConfig(Scan scan) {
|
||||
@ -629,6 +665,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<Result>> scanAll(Scan scan) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(scan);
|
||||
return tracedFuture(() -> {
|
||||
CompletableFuture<List<Result>> future = new CompletableFuture<>();
|
||||
List<Result> scanResults = new ArrayList<>();
|
||||
@ -650,27 +688,35 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}, "AsyncTable.scanAll", tableName);
|
||||
}, supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Result>> get(List<Get> gets) {
|
||||
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName);
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(gets);
|
||||
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Void>> put(List<Put> puts) {
|
||||
return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName);
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(puts);
|
||||
return tracedFutures(() -> voidMutate(puts), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
|
||||
return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName);
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(deletes);
|
||||
return tracedFutures(() -> voidMutate(deletes), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
|
||||
return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName);
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(actions);
|
||||
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
|
||||
}
|
||||
|
||||
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
|
||||
|
@ -0,0 +1,143 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Construct {@link io.opentelemetry.api.trace.Span} instances originating from
|
||||
* "table operations" -- the verbs in our public API that interact with data in tables.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableOperationSpanBuilder implements Supplier<Span> {
|
||||
|
||||
// n.b. The results of this class are tested implicitly by way of the likes of
|
||||
// `TestAsyncTableTracing` and friends.
|
||||
|
||||
private static final String unknown = "UNKNOWN";
|
||||
|
||||
private TableName tableName;
|
||||
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
|
||||
|
||||
@Override public Span get() {
|
||||
return build();
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setOperation(final Scan scan) {
|
||||
return setOperation(valueFrom(scan));
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setOperation(final Row row) {
|
||||
return setOperation(valueFrom(row));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public TableOperationSpanBuilder setOperation(final Collection<? extends Row> operations) {
|
||||
return setOperation(Operation.BATCH);
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setOperation(final Operation operation) {
|
||||
attributes.put(DB_OPERATION, operation.name());
|
||||
return this;
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setTableName(final TableName tableName) {
|
||||
this.tableName = tableName;
|
||||
attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
|
||||
attributes.put(DB_NAME, tableName.getNamespaceAsString());
|
||||
attributes.put(TABLE_KEY, tableName.getNameAsString());
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Span build() {
|
||||
final String name = attributes.getOrDefault(DB_OPERATION, unknown)
|
||||
+ " "
|
||||
+ (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown);
|
||||
final SpanBuilder builder = TraceUtil.getGlobalTracer()
|
||||
.spanBuilder(name)
|
||||
// TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
|
||||
.setSpanKind(SpanKind.CLIENT);
|
||||
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
|
||||
return builder.startSpan();
|
||||
}
|
||||
|
||||
private static Operation valueFrom(final Scan scan) {
|
||||
if (scan == null) {
|
||||
return null;
|
||||
}
|
||||
return Operation.SCAN;
|
||||
}
|
||||
|
||||
private static Operation valueFrom(final Row row) {
|
||||
if (row == null) {
|
||||
return null;
|
||||
}
|
||||
if (row instanceof Append) {
|
||||
return Operation.APPEND;
|
||||
}
|
||||
if (row instanceof CheckAndMutate) {
|
||||
return Operation.CHECK_AND_MUTATE;
|
||||
}
|
||||
if (row instanceof Delete) {
|
||||
return Operation.DELETE;
|
||||
}
|
||||
if (row instanceof Get) {
|
||||
return Operation.GET;
|
||||
}
|
||||
if (row instanceof Increment) {
|
||||
return Operation.INCREMENT;
|
||||
}
|
||||
if (row instanceof Put) {
|
||||
return Operation.PUT;
|
||||
}
|
||||
if (row instanceof RegionCoprocessorServiceExec) {
|
||||
return Operation.COPROC_EXEC;
|
||||
}
|
||||
if (row instanceof RowMutations) {
|
||||
return Operation.BATCH;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -17,15 +17,24 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
@ -42,14 +51,18 @@ import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MatcherPredicate;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.core.IsAnything;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
@ -58,10 +71,8 @@ import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
@ -134,11 +145,17 @@ public class TestAsyncTableTracing {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
ClientProtos.MultiResponse resp =
|
||||
ClientProtos.MultiResponse.newBuilder()
|
||||
.addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
|
||||
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
|
||||
.build();
|
||||
ClientProtos.MultiRequest req = invocation.getArgument(1);
|
||||
ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
|
||||
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
|
||||
RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
|
||||
for (ClientProtos.Action ignored : regionAction.getActionList()) {
|
||||
raBuilder.addResultOrException(
|
||||
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
|
||||
}
|
||||
builder.addRegionActionResult(raBuilder);
|
||||
}
|
||||
ClientProtos.MultiResponse resp = builder.build();
|
||||
RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
|
||||
ForkJoinPool.commonPool().execute(() -> done.run(resp));
|
||||
return null;
|
||||
@ -219,49 +236,73 @@ public class TestAsyncTableTracing {
|
||||
Closeables.close(conn, true);
|
||||
}
|
||||
|
||||
private void assertTrace(String methodName) {
|
||||
Waiter.waitFor(CONF, 1000,
|
||||
() -> traceRule.getSpans().stream()
|
||||
.anyMatch(span -> span.getName().equals("AsyncTable." + methodName) &&
|
||||
span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
|
||||
SpanData data = traceRule.getSpans().stream()
|
||||
.filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get();
|
||||
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
|
||||
TableName tableName = table.getName();
|
||||
assertEquals(tableName.getNamespaceAsString(), data.getAttributes().get(NAMESPACE_KEY));
|
||||
assertEquals(tableName.getNameAsString(), data.getAttributes().get(TABLE_KEY));
|
||||
/**
|
||||
* All {@link Span}s generated from table data access operations over {@code tableName} should
|
||||
* include these attributes.
|
||||
*/
|
||||
static Matcher<SpanData> buildBaseAttributesMatcher(TableName tableName) {
|
||||
return hasAttributes(allOf(
|
||||
containsEntry("db.name", tableName.getNamespaceAsString()),
|
||||
containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()),
|
||||
containsEntry("db.hbase.table", tableName.getNameAsString())));
|
||||
}
|
||||
|
||||
private void assertTrace(String tableOperation) {
|
||||
assertTrace(tableOperation, new IsAnything<>());
|
||||
}
|
||||
|
||||
private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
|
||||
final TableName tableName = table.getName();
|
||||
final Matcher<SpanData> spanLocator = allOf(
|
||||
hasName(containsString(tableOperation)), hasEnded());
|
||||
final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
|
||||
|
||||
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
|
||||
"waiting for span to emit",
|
||||
() -> traceRule.getSpans(), hasItem(spanLocator)));
|
||||
SpanData data = traceRule.getSpans()
|
||||
.stream()
|
||||
.filter(spanLocator::matches)
|
||||
.findFirst()
|
||||
.orElseThrow(AssertionError::new);
|
||||
assertThat(data, allOf(
|
||||
hasName(expectedName),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
buildBaseAttributesMatcher(tableName),
|
||||
matcher));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExists() {
|
||||
table.exists(new Get(Bytes.toBytes(0))).join();
|
||||
assertTrace("get");
|
||||
assertTrace("GET");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() {
|
||||
table.get(new Get(Bytes.toBytes(0))).join();
|
||||
assertTrace("get");
|
||||
assertTrace("GET");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPut() {
|
||||
table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
|
||||
Bytes.toBytes("v"))).join();
|
||||
assertTrace("put");
|
||||
assertTrace("PUT");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() {
|
||||
table.delete(new Delete(Bytes.toBytes(0))).join();
|
||||
assertTrace("delete");
|
||||
assertTrace("DELETE");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() {
|
||||
table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
|
||||
Bytes.toBytes("v"))).join();
|
||||
assertTrace("append");
|
||||
assertTrace("APPEND");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -270,21 +311,21 @@ public class TestAsyncTableTracing {
|
||||
.increment(
|
||||
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
|
||||
.join();
|
||||
assertTrace("increment");
|
||||
assertTrace("INCREMENT");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementColumnValue1() {
|
||||
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
|
||||
.join();
|
||||
assertTrace("increment");
|
||||
assertTrace("INCREMENT");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementColumnValue2() {
|
||||
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
|
||||
Durability.ASYNC_WAL).join();
|
||||
assertTrace("increment");
|
||||
assertTrace("INCREMENT");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -292,7 +333,7 @@ public class TestAsyncTableTracing {
|
||||
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0)))).join();
|
||||
assertTrace("checkAndMutate");
|
||||
assertTrace("CHECK_AND_MUTATE");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -302,7 +343,7 @@ public class TestAsyncTableTracing {
|
||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("checkAndMutateList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -310,7 +351,88 @@ public class TestAsyncTableTracing {
|
||||
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0))))).join();
|
||||
assertTrace("checkAndMutateList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
private void testCheckAndMutateBuilder(Row op) {
|
||||
AsyncTable.CheckAndMutateBuilder builder =
|
||||
table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
|
||||
.qualifier(Bytes.toBytes("cq"))
|
||||
.ifEquals(Bytes.toBytes("v"));
|
||||
if (op instanceof Put) {
|
||||
Put put = (Put) op;
|
||||
builder.thenPut(put).join();
|
||||
} else if (op instanceof Delete) {
|
||||
Delete delete = (Delete) op;
|
||||
builder.thenDelete(delete).join();
|
||||
} else if (op instanceof RowMutations) {
|
||||
RowMutations mutations = (RowMutations) op;
|
||||
builder.thenMutate(mutations).join();
|
||||
} else {
|
||||
fail("unsupported CheckAndPut operation " + op);
|
||||
}
|
||||
assertTrace("CHECK_AND_MUTATE");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateBuilderThenPut() {
|
||||
Put put = new Put(Bytes.toBytes(0))
|
||||
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
|
||||
testCheckAndMutateBuilder(put);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateBuilderThenDelete() {
|
||||
testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateBuilderThenMutations() throws IOException {
|
||||
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
|
||||
.add((Mutation) (new Put(Bytes.toBytes(0))
|
||||
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"))))
|
||||
.add((Mutation) new Delete(Bytes.toBytes(0)));
|
||||
testCheckAndMutateBuilder(mutations);
|
||||
}
|
||||
|
||||
private void testCheckAndMutateWithFilterBuilder(Row op) {
|
||||
// use of `PrefixFilter` is completely arbitrary here.
|
||||
AsyncTable.CheckAndMutateWithFilterBuilder builder =
|
||||
table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
|
||||
if (op instanceof Put) {
|
||||
Put put = (Put) op;
|
||||
builder.thenPut(put).join();
|
||||
} else if (op instanceof Delete) {
|
||||
Delete delete = (Delete) op;
|
||||
builder.thenDelete(delete).join();
|
||||
} else if (op instanceof RowMutations) {
|
||||
RowMutations mutations = (RowMutations) op;
|
||||
builder.thenMutate(mutations).join();
|
||||
} else {
|
||||
fail("unsupported CheckAndPut operation " + op);
|
||||
}
|
||||
assertTrace("CHECK_AND_MUTATE");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithFilterBuilderThenPut() {
|
||||
Put put = new Put(Bytes.toBytes(0))
|
||||
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
|
||||
testCheckAndMutateWithFilterBuilder(put);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithFilterBuilderThenDelete() {
|
||||
testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
|
||||
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
|
||||
.add((Mutation) new Put(Bytes.toBytes(0))
|
||||
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
|
||||
.add((Mutation) new Delete(Bytes.toBytes(0)));
|
||||
testCheckAndMutateWithFilterBuilder(mutations);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -319,13 +441,13 @@ public class TestAsyncTableTracing {
|
||||
RowMutations mutation = new RowMutations(row);
|
||||
mutation.add(new Delete(row));
|
||||
table.mutateRow(mutation).get();
|
||||
assertTrace("mutateRow");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanAll() throws IOException {
|
||||
public void testScanAll() {
|
||||
table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
|
||||
assertTrace("scanAll");
|
||||
assertTrace("SCAN");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -334,13 +456,13 @@ public class TestAsyncTableTracing {
|
||||
.allOf(
|
||||
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("getList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistsAll() {
|
||||
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
||||
assertTrace("getList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -348,13 +470,13 @@ public class TestAsyncTableTracing {
|
||||
CompletableFuture
|
||||
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("getList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAll() {
|
||||
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
||||
assertTrace("getList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -363,14 +485,14 @@ public class TestAsyncTableTracing {
|
||||
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("putList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutAll() {
|
||||
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
|
||||
assertTrace("putList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -379,13 +501,13 @@ public class TestAsyncTableTracing {
|
||||
.allOf(
|
||||
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("deleteList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteAll() {
|
||||
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
||||
assertTrace("deleteList");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -394,13 +516,13 @@ public class TestAsyncTableTracing {
|
||||
.allOf(
|
||||
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("batch");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchAll() {
|
||||
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
||||
assertTrace("batch");
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -17,6 +17,15 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.TestAsyncTableTracing.buildBaseAttributesMatcher;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
@ -25,6 +34,9 @@ import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
@ -34,13 +46,17 @@ import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MatcherPredicate;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.core.IsAnything;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
@ -217,56 +233,82 @@ public class TestHTableTracing extends TestTracingBase {
|
||||
Closeables.close(conn, true);
|
||||
}
|
||||
|
||||
private void assertTrace(String tableOperation) {
|
||||
assertTrace(tableOperation, new IsAnything<>());
|
||||
}
|
||||
|
||||
private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
|
||||
final TableName tableName = table.getName();
|
||||
final Matcher<SpanData> spanLocator = allOf(
|
||||
hasName(containsString(tableOperation)), hasEnded());
|
||||
final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
|
||||
|
||||
Waiter.waitFor(conf, 1000, new MatcherPredicate<>(
|
||||
"waiting for span to emit",
|
||||
() -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
|
||||
SpanData data = TRACE_RULE.getSpans()
|
||||
.stream()
|
||||
.filter(spanLocator::matches)
|
||||
.findFirst()
|
||||
.orElseThrow(AssertionError::new);
|
||||
assertThat(data, allOf(
|
||||
hasName(expectedName),
|
||||
hasKind(SpanKind.CLIENT),
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
buildBaseAttributesMatcher(tableName),
|
||||
matcher));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPut() throws IOException {
|
||||
table.put(new Put(Bytes.toBytes(0))
|
||||
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")));
|
||||
assertTrace(HTable.class.getSimpleName(), "put", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("PUT");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExists() throws IOException {
|
||||
table.exists(new Get(Bytes.toBytes(0)));
|
||||
assertTrace(HTable.class.getSimpleName(), "get", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("GET");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() throws IOException {
|
||||
table.get(new Get(Bytes.toBytes(0)));
|
||||
assertTrace(HTable.class.getSimpleName(), "get", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("GET");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() throws IOException {
|
||||
table.delete(new Delete(Bytes.toBytes(0)));
|
||||
assertTrace(HTable.class.getSimpleName(), "delete", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("DELETE");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() throws IOException {
|
||||
table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
|
||||
Bytes.toBytes("v")));
|
||||
assertTrace(HTable.class.getSimpleName(), "append", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("APPEND");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() throws IOException {
|
||||
table.increment(
|
||||
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1));
|
||||
assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("INCREMENT");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementColumnValue1() throws IOException {
|
||||
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1);
|
||||
assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("INCREMENT");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementColumnValue2() throws IOException {
|
||||
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
|
||||
Durability.SYNC_WAL);
|
||||
assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("INCREMENT");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -274,7 +316,7 @@ public class TestHTableTracing extends TestTracingBase {
|
||||
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0))));
|
||||
assertTrace(HTable.class.getSimpleName(), "checkAndMutate", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("CHECK_AND_MUTATE");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -282,8 +324,7 @@ public class TestHTableTracing extends TestTracingBase {
|
||||
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0)))));
|
||||
assertTrace(HTable.class.getSimpleName(), "checkAndMutateList", null,
|
||||
TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -291,52 +332,51 @@ public class TestHTableTracing extends TestTracingBase {
|
||||
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0)))));
|
||||
assertTrace(HTable.class.getSimpleName(), "checkAndMutateList", null,
|
||||
TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMutateRow() throws Exception {
|
||||
byte[] row = Bytes.toBytes(0);
|
||||
table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row))));
|
||||
assertTrace(HTable.class.getSimpleName(), "mutateRow", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistsList() throws IOException {
|
||||
table.exists(Arrays.asList(new Get(Bytes.toBytes(0))));
|
||||
assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistsAll() throws IOException {
|
||||
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0))));
|
||||
assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetList() throws IOException {
|
||||
table.get(Arrays.asList(new Get(Bytes.toBytes(0))));
|
||||
assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutList() throws IOException {
|
||||
table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||
Bytes.toBytes("cq"), Bytes.toBytes("v"))));
|
||||
assertTrace(HTable.class.getSimpleName(), "putList", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteList() throws IOException {
|
||||
table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0))));
|
||||
assertTrace(HTable.class.getSimpleName(), "deleteList", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchList() throws IOException, InterruptedException {
|
||||
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null);
|
||||
assertTrace(HTable.class.getSimpleName(), "batch", null, TableName.META_TABLE_NAME);
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.trace.hamcrest;
|
||||
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasProperty;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.TypeSafeMatcher;
|
||||
|
||||
/**
|
||||
* Helper methods for matching against instances of {@link io.opentelemetry.api.common.Attributes}.
|
||||
*/
|
||||
public final class AttributesMatchers {
|
||||
|
||||
private AttributesMatchers() { }
|
||||
|
||||
public static <T> Matcher<Attributes> containsEntry(
|
||||
Matcher<AttributeKey<? super T>> keyMatcher,
|
||||
Matcher<? super T> valueMatcher
|
||||
) {
|
||||
return new IsAttributesContaining<>(keyMatcher, valueMatcher);
|
||||
}
|
||||
|
||||
public static <T> Matcher<Attributes> containsEntry(AttributeKey<T> key, T value) {
|
||||
return containsEntry(equalTo(key), equalTo(value));
|
||||
}
|
||||
|
||||
public static Matcher<Attributes> containsEntry(String key, String value) {
|
||||
return containsEntry(AttributeKey.stringKey(key), value);
|
||||
}
|
||||
|
||||
private static final class IsAttributesContaining<T> extends TypeSafeMatcher<Attributes> {
|
||||
private final Matcher<AttributeKey<? super T>> keyMatcher;
|
||||
private final Matcher<? super T> valueMatcher;
|
||||
|
||||
private IsAttributesContaining(
|
||||
final Matcher<AttributeKey<? super T>> keyMatcher,
|
||||
final Matcher<? super T> valueMatcher
|
||||
) {
|
||||
this.keyMatcher = keyMatcher;
|
||||
this.valueMatcher = valueMatcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean matchesSafely(Attributes item) {
|
||||
return item.asMap().entrySet().stream().anyMatch(e -> allOf(
|
||||
hasProperty("key", keyMatcher),
|
||||
hasProperty("value", valueMatcher))
|
||||
.matches(e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeMismatchSafely(Attributes item, Description mismatchDescription) {
|
||||
mismatchDescription
|
||||
.appendText("Attributes was ")
|
||||
.appendValueList("[", ", ", "]", item.asMap().entrySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description
|
||||
.appendText("Attributes containing [")
|
||||
.appendDescriptionOf(keyMatcher)
|
||||
.appendText("->")
|
||||
.appendDescriptionOf(valueMatcher)
|
||||
.appendText("]");
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.trace.hamcrest;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.FeatureMatcher;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.TypeSafeMatcher;
|
||||
|
||||
/**
|
||||
* Helper methods for matching against instances of {@link SpanData}.
|
||||
*/
|
||||
public final class SpanDataMatchers {
|
||||
|
||||
private SpanDataMatchers() { }
|
||||
|
||||
public static Matcher<SpanData> hasAttributes(Matcher<Attributes> matcher) {
|
||||
return new FeatureMatcher<SpanData, Attributes>(
|
||||
matcher, "SpanData having attributes that ", "attributes"
|
||||
) {
|
||||
@Override protected Attributes featureValueOf(SpanData item) {
|
||||
return item.getAttributes();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasEnded() {
|
||||
return new TypeSafeMatcher<SpanData>() {
|
||||
@Override protected boolean matchesSafely(SpanData item) {
|
||||
return item.hasEnded();
|
||||
}
|
||||
@Override public void describeTo(Description description) {
|
||||
description.appendText("SpanData that hasEnded");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasKind(SpanKind kind) {
|
||||
return new FeatureMatcher<SpanData, SpanKind>(
|
||||
equalTo(kind), "SpanData with kind that", "SpanKind") {
|
||||
@Override protected SpanKind featureValueOf(SpanData item) {
|
||||
return item.getKind();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasName(String name) {
|
||||
return hasName(equalTo(name));
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasName(Matcher<String> matcher) {
|
||||
return new FeatureMatcher<SpanData, String>(matcher, "SpanKind with a name that", "name") {
|
||||
@Override protected String featureValueOf(SpanData item) {
|
||||
return item.getName();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
|
||||
final Matcher<StatusCode> matcher = is(equalTo(statusCode));
|
||||
return new TypeSafeMatcher<SpanData>() {
|
||||
@Override protected boolean matchesSafely(SpanData item) {
|
||||
final StatusData statusData = item.getStatus();
|
||||
return statusData != null
|
||||
&& statusData.getStatusCode() != null
|
||||
&& matcher.matches(statusData.getStatusCode());
|
||||
}
|
||||
@Override public void describeTo(Description description) {
|
||||
description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -28,7 +28,9 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class HBaseSemanticAttributes {
|
||||
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
|
||||
public static final AttributeKey<String> NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE;
|
||||
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
|
||||
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
|
||||
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
|
||||
AttributeKey.stringArrayKey("db.hbase.regions");
|
||||
@ -44,5 +46,23 @@ public final class HBaseSemanticAttributes {
|
||||
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
|
||||
public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
|
||||
|
||||
/**
|
||||
* These are values used with {@link #DB_OPERATION}. They correspond with the implementations of
|
||||
* {@code org.apache.hadoop.hbase.client.Operation}, as well as
|
||||
* {@code org.apache.hadoop.hbase.client.CheckAndMutate}, and "MULTI", meaning a batch of multiple
|
||||
* operations.
|
||||
*/
|
||||
public enum Operation {
|
||||
APPEND,
|
||||
BATCH,
|
||||
CHECK_AND_MUTATE,
|
||||
COPROC_EXEC,
|
||||
DELETE,
|
||||
GET,
|
||||
INCREMENT,
|
||||
PUT,
|
||||
SCAN,
|
||||
}
|
||||
|
||||
private HBaseSemanticAttributes() { }
|
||||
}
|
||||
|
@ -91,9 +91,11 @@ public final class TraceUtil {
|
||||
/**
|
||||
* Trace an asynchronous operation for a table.
|
||||
*/
|
||||
public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action,
|
||||
String spanName, TableName tableName) {
|
||||
Span span = createTableSpan(spanName, tableName);
|
||||
public static <T> CompletableFuture<T> tracedFuture(
|
||||
Supplier<CompletableFuture<T>> action,
|
||||
Supplier<Span> spanSupplier
|
||||
) {
|
||||
Span span = spanSupplier.get();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
CompletableFuture<T> future = action.get();
|
||||
endSpan(future, span);
|
||||
@ -119,8 +121,10 @@ public final class TraceUtil {
|
||||
* {@code futures} are completed.
|
||||
*/
|
||||
public static <T> List<CompletableFuture<T>> tracedFutures(
|
||||
Supplier<List<CompletableFuture<T>>> action, String spanName, TableName tableName) {
|
||||
Span span = createTableSpan(spanName, tableName);
|
||||
Supplier<List<CompletableFuture<T>>> action,
|
||||
Supplier<Span> spanSupplier
|
||||
) {
|
||||
Span span = spanSupplier.get();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
List<CompletableFuture<T>> futures = action.get();
|
||||
endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);
|
||||
|
Loading…
x
Reference in New Issue
Block a user