From 7d8dc3524981a04e82ff236bb3dc18d36ebc82b4 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Thu, 18 Nov 2021 09:24:45 -0800 Subject: [PATCH] HBASE-26473 Introduce `db.hbase.container_operations` span attribute For batch operations, collect and annotate the associated span with the set of all operations contained in the batch. Signed-off-by: Duo Zhang --- .../hbase/client/RawAsyncTableImpl.java | 39 ++++++---- .../trace/TableOperationSpanBuilder.java | 73 +++++++++++++++++++ .../hbase/client/TestAsyncTableTracing.java | 49 +++++++++---- .../hbase/trace/HBaseSemanticAttributes.java | 6 ++ 4 files changed, 140 insertions(+), 27 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index ef4081f7e8a..655ab230645 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -348,7 +348,8 @@ class RawAsyncTableImpl implements AsyncTable { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(put); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, @@ -363,7 +364,8 @@ class RawAsyncTableImpl implements AsyncTable { public CompletableFuture thenDelete(Delete delete) { preCheck(); final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(delete); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -379,7 +381,8 @@ class RawAsyncTableImpl implements AsyncTable { preCheck(); validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(mutations); return tracedFuture( () -> RawAsyncTableImpl.this . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) @@ -422,7 +425,8 @@ class RawAsyncTableImpl implements AsyncTable { public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(put); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, @@ -437,7 +441,8 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenDelete(Delete delete) { final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(delete); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -452,7 +457,8 @@ class RawAsyncTableImpl implements AsyncTable { public CompletableFuture thenMutate(RowMutations mutations) { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(mutations); return tracedFuture( () -> RawAsyncTableImpl.this . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) @@ -474,7 +480,8 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(checkAndMutate); + .setOperation(checkAndMutate) + .setContainerOperations(checkAndMutate.getAction()); return tracedFuture(() -> { if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete || @@ -527,7 +534,8 @@ class RawAsyncTableImpl implements AsyncTable { public List> checkAndMutate(List checkAndMutates) { final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(checkAndMutates); + .setOperation(checkAndMutates) + .setContainerOperations(checkAndMutates); return tracedFutures( () -> batch(checkAndMutates, rpcTimeoutNs).stream() .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), @@ -583,7 +591,8 @@ class RawAsyncTableImpl implements AsyncTable { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(mutations); + .setOperation(mutations) + .setContainerOperations(mutations); return tracedFuture( () -> this . newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) @@ -656,28 +665,32 @@ class RawAsyncTableImpl implements AsyncTable { @Override public List> get(List gets) { final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(gets); + .setOperation(gets) + .setContainerOperations(HBaseSemanticAttributes.Operation.GET); return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); } @Override public List> put(List puts) { final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(puts); + .setOperation(puts) + .setContainerOperations(HBaseSemanticAttributes.Operation.PUT); return tracedFutures(() -> voidMutate(puts), supplier); } @Override public List> delete(List deletes) { final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(deletes); + .setOperation(deletes) + .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE); return tracedFutures(() -> voidMutate(deletes), supplier); } @Override public List> batch(List actions) { final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(actions); + .setOperation(actions) + .setContainerOperations(actions); return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java index de7b700f4e5..2171afe768b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java @@ -18,15 +18,22 @@ package org.apache.hadoop.hbase.client.trace; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanKind; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.AsyncConnectionImpl; @@ -85,6 +92,72 @@ public class TableOperationSpanBuilder implements Supplier { return this; } + // `setContainerOperations` perform a recursive descent expansion of all the operations + // contained within the provided "batch" object. + + public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) { + final Operation[] ops = mutations.getMutations() + .stream() + .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) + .toArray(Operation[]::new); + return setContainerOperations(ops); + } + + public TableOperationSpanBuilder setContainerOperations(final Row row) { + final Operation[] ops = + Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()) + .toArray(Operation[]::new); + return setContainerOperations(ops); + } + + public TableOperationSpanBuilder setContainerOperations( + final Collection operations + ) { + final Operation[] ops = operations.stream() + .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) + .toArray(Operation[]::new); + return setContainerOperations(ops); + } + + private static Set unpackRowOperations(final Row row) { + final Set ops = new HashSet<>(); + if (row instanceof CheckAndMutate) { + final CheckAndMutate cam = (CheckAndMutate) row; + ops.addAll(unpackRowOperations(cam)); + } + if (row instanceof RowMutations) { + final RowMutations mutations = (RowMutations) row; + ops.addAll(unpackRowOperations(mutations)); + } + return ops; + } + + private static Set unpackRowOperations(final CheckAndMutate cam) { + final Set ops = new HashSet<>(); + final Operation op = valueFrom(cam.getAction()); + switch (op) { + case BATCH: + case CHECK_AND_MUTATE: + ops.addAll(unpackRowOperations(cam.getAction())); + break; + default: + ops.add(op); + } + return ops; + } + + public TableOperationSpanBuilder setContainerOperations( + final Operation... operations + ) { + final List ops = Arrays.stream(operations) + .map(op -> op == null ? unknown : op.name()) + .sorted() + .distinct() + .collect(Collectors.toList()); + attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops); + return this; + } + public TableOperationSpanBuilder setTableName(final TableName tableName) { this.tableName = tableName; TableSpanBuilder.populateTableNameAttributes(attributes, tableName); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index d8a645349cb..8df074e68e4 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; @@ -333,7 +335,9 @@ public class TestAsyncTableTracing { .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf( + "db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE"))); } @Test @@ -341,7 +345,9 @@ public class TestAsyncTableTracing { table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf( + "db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE"))); } private void testCheckAndMutateBuilder(Row op) { @@ -427,8 +433,13 @@ public class TestAsyncTableTracing { @Test public void testMutateRow() throws IOException { - table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0)))); - assertTrace("BATCH"); + final RowMutations mutations = new RowMutations(Bytes.toBytes(0)) + .add(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .add(new Delete(Bytes.toBytes(0))); + table.mutateRow(mutations).join(); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT"))); } @Test @@ -443,13 +454,15 @@ public class TestAsyncTableTracing { .allOf( table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test public void testExistsAll() { table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test @@ -457,13 +470,15 @@ public class TestAsyncTableTracing { CompletableFuture .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test public void testGetAll() { table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test @@ -472,14 +487,16 @@ public class TestAsyncTableTracing { .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); } @Test public void testPutAll() { table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); } @Test @@ -488,13 +505,15 @@ public class TestAsyncTableTracing { .allOf( table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } @Test public void testDeleteAll() { table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } @Test @@ -503,12 +522,14 @@ public class TestAsyncTableTracing { .allOf( table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } @Test public void testBatchAll() { table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index fd6ab852e06..1a74fdcd65a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -36,6 +36,12 @@ public final class HBaseSemanticAttributes { public static final AttributeKey DB_NAME = SemanticAttributes.DB_NAME; public static final AttributeKey DB_OPERATION = SemanticAttributes.DB_OPERATION; public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table"); + /** + * For operations that themselves ship one or more operations, such as + * {@link Operation#BATCH} and {@link Operation#CHECK_AND_MUTATE}. + */ + public static final AttributeKey> CONTAINER_DB_OPERATIONS_KEY = + AttributeKey.stringArrayKey("db.hbase.container_operations"); public static final AttributeKey> REGION_NAMES_KEY = AttributeKey.stringArrayKey("db.hbase.regions"); public static final AttributeKey RPC_SERVICE_KEY =