Revert "HBASE-26473 Introduce `db.hbase.container_operations` span attribute"
This reverts commit 7d8dc35249
.
This commit is contained in:
parent
f278a4c98d
commit
056e8fb88b
|
@ -348,8 +348,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||
preCheck();
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||
.setContainerOperations(put);
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
|
||||
|
@ -364,8 +363,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
preCheck();
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||
.setContainerOperations(delete);
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
||||
|
@ -381,8 +379,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
preCheck();
|
||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||
.setContainerOperations(mutations);
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this
|
||||
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
||||
|
@ -425,8 +422,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||
.setContainerOperations(put);
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
||||
|
@ -441,8 +437,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
@Override
|
||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||
.setContainerOperations(delete);
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
|
||||
|
@ -457,8 +452,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
|
||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
|
||||
.setContainerOperations(mutations);
|
||||
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
|
||||
return tracedFuture(
|
||||
() -> RawAsyncTableImpl.this
|
||||
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
|
||||
|
@ -480,8 +474,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
@Override
|
||||
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(checkAndMutate)
|
||||
.setContainerOperations(checkAndMutate.getAction());
|
||||
.setOperation(checkAndMutate);
|
||||
return tracedFuture(() -> {
|
||||
if (checkAndMutate.getAction() instanceof Put ||
|
||||
checkAndMutate.getAction() instanceof Delete ||
|
||||
|
@ -534,8 +527,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
public List<CompletableFuture<CheckAndMutateResult>>
|
||||
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(checkAndMutates)
|
||||
.setContainerOperations(checkAndMutates);
|
||||
.setOperation(checkAndMutates);
|
||||
return tracedFutures(
|
||||
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
|
||||
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
|
||||
|
@ -591,8 +583,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(mutations)
|
||||
.setContainerOperations(mutations);
|
||||
.setOperation(mutations);
|
||||
return tracedFuture(
|
||||
() -> this
|
||||
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
|
||||
|
@ -665,32 +656,28 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
@Override
|
||||
public List<CompletableFuture<Result>> get(List<Get> gets) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(gets)
|
||||
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
|
||||
.setOperation(gets);
|
||||
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Void>> put(List<Put> puts) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(puts)
|
||||
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
|
||||
.setOperation(puts);
|
||||
return tracedFutures(() -> voidMutate(puts), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(deletes)
|
||||
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
|
||||
.setOperation(deletes);
|
||||
return tracedFutures(() -> voidMutate(deletes), supplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
|
||||
final Supplier<Span> supplier = newTableOperationSpanBuilder()
|
||||
.setOperation(actions)
|
||||
.setContainerOperations(actions);
|
||||
.setOperation(actions);
|
||||
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,22 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
||||
|
@ -92,72 +85,6 @@ public class TableOperationSpanBuilder implements Supplier<Span> {
|
|||
return this;
|
||||
}
|
||||
|
||||
// `setContainerOperations` perform a recursive descent expansion of all the operations
|
||||
// contained within the provided "batch" object.
|
||||
|
||||
public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) {
|
||||
final Operation[] ops = mutations.getMutations()
|
||||
.stream()
|
||||
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
|
||||
.toArray(Operation[]::new);
|
||||
return setContainerOperations(ops);
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setContainerOperations(final Row row) {
|
||||
final Operation[] ops =
|
||||
Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())
|
||||
.toArray(Operation[]::new);
|
||||
return setContainerOperations(ops);
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setContainerOperations(
|
||||
final Collection<? extends Row> operations
|
||||
) {
|
||||
final Operation[] ops = operations.stream()
|
||||
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
|
||||
.toArray(Operation[]::new);
|
||||
return setContainerOperations(ops);
|
||||
}
|
||||
|
||||
private static Set<Operation> unpackRowOperations(final Row row) {
|
||||
final Set<Operation> ops = new HashSet<>();
|
||||
if (row instanceof CheckAndMutate) {
|
||||
final CheckAndMutate cam = (CheckAndMutate) row;
|
||||
ops.addAll(unpackRowOperations(cam));
|
||||
}
|
||||
if (row instanceof RowMutations) {
|
||||
final RowMutations mutations = (RowMutations) row;
|
||||
ops.addAll(unpackRowOperations(mutations));
|
||||
}
|
||||
return ops;
|
||||
}
|
||||
|
||||
private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) {
|
||||
final Set<Operation> ops = new HashSet<>();
|
||||
final Operation op = valueFrom(cam.getAction());
|
||||
switch (op) {
|
||||
case BATCH:
|
||||
case CHECK_AND_MUTATE:
|
||||
ops.addAll(unpackRowOperations(cam.getAction()));
|
||||
break;
|
||||
default:
|
||||
ops.add(op);
|
||||
}
|
||||
return ops;
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setContainerOperations(
|
||||
final Operation... operations
|
||||
) {
|
||||
final List<String> ops = Arrays.stream(operations)
|
||||
.map(op -> op == null ? unknown : op.name())
|
||||
.sorted()
|
||||
.distinct()
|
||||
.collect(Collectors.toList());
|
||||
attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops);
|
||||
return this;
|
||||
}
|
||||
|
||||
public TableOperationSpanBuilder setTableName(final TableName tableName) {
|
||||
this.tableName = tableName;
|
||||
TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
|
@ -335,9 +333,7 @@ public class TestAsyncTableTracing {
|
|||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf(
|
||||
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -345,9 +341,7 @@ public class TestAsyncTableTracing {
|
|||
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
|
||||
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
|
||||
.build(new Delete(Bytes.toBytes(0))))).join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf(
|
||||
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
private void testCheckAndMutateBuilder(Row op) {
|
||||
|
@ -433,13 +427,8 @@ public class TestAsyncTableTracing {
|
|||
|
||||
@Test
|
||||
public void testMutateRow() throws IOException {
|
||||
final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
|
||||
.add(new Put(Bytes.toBytes(0))
|
||||
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
|
||||
.add(new Delete(Bytes.toBytes(0)));
|
||||
table.mutateRow(mutations).join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
|
||||
table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -454,15 +443,13 @@ public class TestAsyncTableTracing {
|
|||
.allOf(
|
||||
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistsAll() {
|
||||
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -470,15 +457,13 @@ public class TestAsyncTableTracing {
|
|||
CompletableFuture
|
||||
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAll() {
|
||||
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -487,16 +472,14 @@ public class TestAsyncTableTracing {
|
|||
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutAll() {
|
||||
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
|
||||
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -505,15 +488,13 @@ public class TestAsyncTableTracing {
|
|||
.allOf(
|
||||
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteAll() {
|
||||
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -522,14 +503,12 @@ public class TestAsyncTableTracing {
|
|||
.allOf(
|
||||
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
|
||||
.join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchAll() {
|
||||
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
|
||||
assertTrace("BATCH", hasAttributes(
|
||||
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
|
||||
assertTrace("BATCH");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,12 +36,6 @@ public final class HBaseSemanticAttributes {
|
|||
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
|
||||
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
|
||||
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
|
||||
/**
|
||||
* For operations that themselves ship one or more operations, such as
|
||||
* {@link Operation#BATCH} and {@link Operation#CHECK_AND_MUTATE}.
|
||||
*/
|
||||
public static final AttributeKey<List<String>> CONTAINER_DB_OPERATIONS_KEY =
|
||||
AttributeKey.stringArrayKey("db.hbase.container_operations");
|
||||
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
|
||||
AttributeKey.stringArrayKey("db.hbase.regions");
|
||||
public static final AttributeKey<String> RPC_SERVICE_KEY =
|
||||
|
|
Loading…
Reference in New Issue