diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 96c650f6949..f6c7d82b020 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; - +import io.opentelemetry.context.Context; import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; /** @@ -280,26 +279,27 @@ class AsyncTableImpl implements AsyncTable { public CoprocessorServiceBuilder coprocessorService( Function stubMaker, ServiceCaller callable, CoprocessorCallback callback) { + final Context context = Context.current(); CoprocessorCallback wrappedCallback = new CoprocessorCallback() { @Override public void onRegionComplete(RegionInfo region, R resp) { - pool.execute(() -> callback.onRegionComplete(region, resp)); + pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp))); } @Override public void onRegionError(RegionInfo region, Throwable error) { - pool.execute(() -> callback.onRegionError(region, error)); + pool.execute(context.wrap(() -> callback.onRegionError(region, error))); } @Override public void onComplete() { - pool.execute(() -> callback.onComplete()); + pool.execute(context.wrap(callback::onComplete)); } @Override public void onError(Throwable error) { - pool.execute(() -> callback.onError(error)); + pool.execute(context.wrap(() -> callback.onError(error))); } }; CoprocessorServiceBuilder builder = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index a144550022b..9c283ca021a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -26,8 +26,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMu import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -51,17 +52,16 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -755,14 +755,22 @@ class RawAsyncTableImpl implements AsyncTable { ServiceCaller callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs); + final Span span = Span.current(); S stub = stubMaker.apply(channel); CompletableFuture future = new CompletableFuture<>(); ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); callable.call(stub, controller, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - future.complete(resp); + try (Scope ignored = span.makeCurrent()) { + if (controller.failed()) { + final Throwable failure = controller.getFailed(); + future.completeExceptionally(failure); + TraceUtil.setError(span, failure); + } else { + future.complete(resp); + span.setStatus(StatusCode.OK); + } + } finally { + span.end(); } }); return future; @@ -795,8 +803,11 @@ class RawAsyncTableImpl implements AsyncTable { ServiceCaller callable, CoprocessorCallback callback, List locs, byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { + final Span span = Span.current(); if (error != null) { callback.onError(error); + TraceUtil.setError(span, error); + span.end(); return; } unfinishedRequest.incrementAndGet(); @@ -807,17 +818,23 @@ class RawAsyncTableImpl implements AsyncTable { addListener( conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, operationTimeoutNs), - (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, - locateFinished, unfinishedRequest, l, e)); + (l, e) -> { + try (Scope ignored = span.makeCurrent()) { + onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, + locateFinished, unfinishedRequest, l, e); + } + }); } addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { - if (e != null) { - callback.onRegionError(region, e); - } else { - callback.onRegionComplete(region, r); - } - if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { - callback.onComplete(); + try (Scope ignored = span.makeCurrent()) { + if (e != null) { + callback.onRegionError(region, e); + } else { + callback.onRegionComplete(region, r); + } + if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { + callback.onComplete(); + } } }); } @@ -868,10 +885,22 @@ class RawAsyncTableImpl implements AsyncTable { @Override public void execute() { - addListener(conn.getLocator().getRegionLocation(tableName, startKey, - startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), - (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, - endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + final Span span = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC) + .build(); + try (Scope ignored = span.makeCurrent()) { + final RegionLocateType regionLocateType = startKeyInclusive + ? RegionLocateType.CURRENT + : RegionLocateType.AFTER; + final CompletableFuture future = conn.getLocator() + .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs); + addListener(future, (loc, error) -> { + try (Scope ignored1 = span.makeCurrent()) { + onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, + endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error); + } + }); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java index 63cc0bbfba9..752aa29e7cf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -30,13 +31,11 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; - import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; @@ -74,6 +73,7 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel { private CompletableFuture rpcCall(MethodDescriptor method, Message request, Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub) { + final Context context = Context.current(); CompletableFuture future = new CompletableFuture<>(); if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) { future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " + @@ -82,39 +82,43 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel { } CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request, row, loc.getRegion().getRegionName()); - stub.execService(controller, csr, - new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback() { - - @Override - public void run(CoprocessorServiceResponse resp) { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - lastRegion = resp.getRegion().getValue().toByteArray(); - try { - future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); - } catch (IOException e) { - future.completeExceptionally(e); - } + stub.execService(controller, csr, resp -> { + try (Scope ignored = context.makeCurrent()) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + lastRegion = resp.getRegion().getValue().toByteArray(); + try { + future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); + } catch (IOException e) { + future.completeExceptionally(e); } } - }); + } + }); return future; } @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback done) { + final Context context = Context.current(); addListener( conn.callerFactory. single().table(tableName).row(row) .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(), + .action((c, l, s) -> { + try (Scope ignored = context.makeCurrent()) { + return rpcCall(method, request, responsePrototype, c, l, s); + } + }).call(), (r, e) -> { - if (e != null) { - setCoprocessorError(controller, e); + try (Scope ignored = context.makeCurrent()) { + if (e != null) { + setCoprocessorError(controller, e); + } + done.run(r); } - done.run(r); }); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 1260f313cf5..0c818f8becb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -18,14 +18,15 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError; - +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; @@ -34,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; @@ -46,9 +48,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -57,7 +62,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -65,7 +69,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** @@ -458,25 +461,22 @@ class TableOverAsyncTable implements Table { R call(RegionCoprocessorRpcChannel channel) throws Exception; } - private void coprocssorService(String serviceName, byte[] startKey, byte[] endKey, + private void coprocessorService(String serviceName, byte[] startKey, byte[] endKey, Callback callback, StubCall call) throws Throwable { // get regions covered by the row range - ExecutorService pool = this.poolSupplier.get(); + ExecutorService pool = Context.current().wrap(this.poolSupplier.get()); List keys = getStartKeysInRange(startKey, endKey); Map> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); try { for (byte[] r : keys) { RegionCoprocessorRpcChannel channel = coprocessorService(r); - Future future = pool.submit(new Callable() { - @Override - public R call() throws Exception { - R result = call.call(channel); - byte[] region = channel.getLastRegion(); - if (callback != null) { - callback.update(region, r, result); - } - return result; + Future future = pool.submit(() -> { + R result = call.call(channel); + byte[] region = channel.getLastRegion(); + if (callback != null) { + callback.update(region, r, result); } + return result; }); futures.put(r, future); } @@ -492,7 +492,7 @@ class TableOverAsyncTable implements Table { try { e.getValue().get(); } catch (ExecutionException ee) { - LOG.warn("Error calling coprocessor service " + serviceName + " for row " + + LOG.warn("Error calling coprocessor service {} for row {}", serviceName, Bytes.toStringBinary(e.getKey()), ee); throw ee.getCause(); } catch (InterruptedException ie) { @@ -505,10 +505,18 @@ class TableOverAsyncTable implements Table { @Override public void coprocessorService(Class service, byte[] startKey, byte[] endKey, Call callable, Callback callback) throws ServiceException, Throwable { - coprocssorService(service.getName(), startKey, endKey, callback, channel -> { - T instance = ProtobufUtil.newServiceStub(service, channel); - return callable.call(instance); - }); + final Supplier supplier = new TableOperationSpanBuilder(conn) + .setTableName(table.getName()) + .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); + TraceUtil.trace(() -> { + final Context context = Context.current(); + coprocessorService(service.getName(), startKey, endKey, callback, channel -> { + try (Scope ignored = context.makeCurrent()) { + T instance = ProtobufUtil.newServiceStub(service, channel); + return callable.call(instance); + } + }); + }, supplier); } @SuppressWarnings("unchecked") @@ -516,9 +524,18 @@ class TableOverAsyncTable implements Table { public void batchCoprocessorService(MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback callback) throws ServiceException, Throwable { - coprocssorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> { - return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype); - }); + final Supplier supplier = new TableOperationSpanBuilder(conn) + .setTableName(table.getName()) + .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); + TraceUtil.trace(() -> { + final Context context = Context.current(); + coprocessorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> { + try (Scope ignored = context.makeCurrent()) { + return (R) channel.callBlockingMethod( + methodDescriptor, null, request, responsePrototype); + } + }); + }, supplier); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java new file mode 100644 index 00000000000..2c7061259f9 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace; + +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Rudimentary tool for visualizing a hierarchy of spans. Given a collection of spans, indexes + * them from parents to children and prints them out one per line, indented. + */ +@InterfaceAudience.Private +public class StringTraceRenderer { + private static final Logger logger = LoggerFactory.getLogger(StringTraceRenderer.class); + + private final List graphs; + + public StringTraceRenderer(final Collection spans) { + final Map spansById = indexSpansById(spans); + populateChildren(spansById); + graphs = findRoots(spansById); + } + + private static Map indexSpansById(final Collection spans) { + final Map spansById = new HashMap<>(spans.size()); + spans.forEach(span -> spansById.put(span.getSpanId(), new Node(span))); + return spansById; + } + + private static void populateChildren(final Map spansById) { + spansById.forEach((spanId, node) -> { + final SpanData spanData = node.spanData; + final String parentSpanId = spanData.getParentSpanId(); + if (Objects.equals(parentSpanId, SpanId.getInvalid())) { + return; + } + final Node parentNode = spansById.get(parentSpanId); + if (parentNode == null) { + logger.warn("Span {} has parent {} that is not found in index, {}", spanId, parentSpanId, + spanData); + return; + } + parentNode.children.put(spanId, node); + }); + } + + private static List findRoots(final Map spansById) { + return spansById.values() + .stream() + .filter(node -> Objects.equals(node.spanData.getParentSpanId(), SpanId.getInvalid())) + .collect(Collectors.toList()); + } + + public void render(final Consumer writer) { + for (ListIterator iter = graphs.listIterator(); iter.hasNext(); ) { + final int idx = iter.nextIndex(); + final Node node = iter.next(); + render(writer, node, 0, idx == 0); + } + } + + private static void render( + final Consumer writer, + final Node node, + final int indent, + final boolean isFirst + ) { + writer.accept(render(node.spanData, indent, isFirst)); + final List children = new ArrayList<>(node.children.values()); + for (ListIterator iter = children.listIterator(); iter.hasNext(); ) { + final int idx = iter.nextIndex(); + final Node child = iter.next(); + render(writer, child, indent + 2, idx == 0); + } + } + + private static String render( + final SpanData spanData, + final int indent, + final boolean isFirst + ) { + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < indent; i++) { + sb.append(' '); + } + + return sb.append(isFirst ? "└─ " : "├─ ") + .append(render(spanData)) + .toString(); + } + + private static String render(final SpanData spanData) { + return new ToStringBuilder(spanData, ToStringStyle.NO_CLASS_NAME_STYLE) + .append("spanId", spanData.getSpanId()) + .append("name", spanData.getName()) + .append("hasEnded", spanData.hasEnded()) + .toString(); + } + + private static class Node { + final SpanData spanData; + final LinkedHashMap children; + + Node(final SpanData spanData) { + this.spanData = spanData; + this.children = new LinkedHashMap<>(); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java index a9473dae559..01aa61805a2 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -99,6 +99,24 @@ public final class SpanDataMatchers { }; } + public static Matcher hasParentSpanId(String parentSpanId) { + return hasParentSpanId(equalTo(parentSpanId)); + } + + public static Matcher hasParentSpanId(SpanData parent) { + return hasParentSpanId(parent.getSpanId()); + } + + public static Matcher hasParentSpanId(Matcher matcher) { + return new FeatureMatcher(matcher, "SpanKind with a parentSpanId that", + "parentSpanId" + ) { + @Override protected String featureValueOf(SpanData item) { + return item.getParentSpanId(); + } + }; + } + public static Matcher hasStatusWithCode(StatusCode statusCode) { final Matcher matcher = is(equalTo(statusCode)); return new TypeSafeMatcher() { diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml index 2b3b98c2701..ab1a4e3bdef 100644 --- a/hbase-endpoint/pom.xml +++ b/hbase-endpoint/pom.xml @@ -109,6 +109,12 @@ org.apache.hbase hbase-client + + org.apache.hbase + hbase-client + test-jar + test + org.apache.hbase @@ -213,6 +219,23 @@ log4j-1.2-api test + + + org.hamcrest + hamcrest-library + test + + + io.opentelemetry + opentelemetry-sdk-trace + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java new file mode 100644 index 00000000000..f91db84e8c8 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.everyItem; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasProperty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ConnectionRule; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MatcherPredicate; +import org.apache.hadoop.hbase.MiniClusterRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.AsyncAdmin; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.ServiceCaller; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; +import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matcher; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExternalResource; +import org.junit.rules.RuleChain; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; + +/** + * Test cases to verify tracing coprocessor Endpoint execution + */ +@Category({ CoprocessorTests.class, MediumTests.class}) +public class TestCoprocessorEndpointTracing { + private static final Logger logger = + LoggerFactory.getLogger(TestCoprocessorEndpointTracing.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCoprocessorEndpointTracing.class); + + private static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); + private static final MiniClusterRule miniclusterRule = MiniClusterRule.newBuilder() + .setConfiguration(() -> { + final Configuration conf = new Configuration(); + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + return conf; + }) + .build(); + private static final ConnectionRule connectionRule = + new ConnectionRule(miniclusterRule::createConnection); + + private static final class Setup extends ExternalResource { + @Override + protected void before() throws Throwable { + final HBaseTestingUtil util = miniclusterRule.getTestingUtility(); + final AsyncConnection connection = connectionRule.getConnection(); + final AsyncAdmin admin = connection.getAdmin(); + final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TEST_TABLE) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build(); + admin.createTable(tableDescriptor).get(); + util.waitUntilAllRegionsAssigned(TEST_TABLE); + } + } + + @ClassRule + public static final TestRule testRule = RuleChain.outerRule(otelClassRule) + .around(miniclusterRule) + .around(connectionRule) + .around(new Setup()); + + private static final TableName TEST_TABLE = + TableName.valueOf(TestCoprocessorEndpointTracing.class.getSimpleName()); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + + @Rule + public OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule); + + @Rule + public TestName testName = new TestName(); + + @Test + public void traceAsyncTableEndpoint() { + final AsyncConnection connection = connectionRule.getConnection(); + final AsyncTable table = connection.getTable(TEST_TABLE); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final CompletableFuture> future = new CompletableFuture<>(); + final AsyncTable.CoprocessorCallback callback = + new AsyncTable.CoprocessorCallback() { + final ConcurrentMap results = new ConcurrentHashMap<>(); + + @Override + public void onRegionComplete(RegionInfo region, EchoResponseProto resp) { + if (!future.isDone()) { + results.put(region.getRegionName(), resp.getMessage()); + } + } + + @Override + public void onRegionError(RegionInfo region, Throwable error) { + if (!future.isDone()) { + future.completeExceptionally(error); + } + } + + @Override + public void onComplete() { + if (!future.isDone()) { + future.complete(results); + } + } + + @Override + public void onError(Throwable error) { + if (!future.isDone()) { + future.completeExceptionally(error); + } + } + }; + + final Map results = TraceUtil.trace(() -> { + table.coprocessorService(TestProtobufRpcProto::newStub, + (stub, controller, cb) -> stub.echo(controller, request, cb), callback) + .execute(); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + equalTo("hello")))); + + final Matcher parentMatcher = allOf(hasName(testName.getMethodName()), hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableEndpointCall() throws Exception { + final Connection connection = connectionRule.getConnection().toConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final RpcController controller = new ServerRpcController(); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final CoprocessorRpcUtils.BlockingRpcCallback callback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + final Map results = TraceUtil.trace(() -> { + try { + return table.coprocessorService(TestProtobufRpcProto.class, null, null, + t -> { + t.echo(controller, request, callback); + return callback.get(); + }); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableEndpointCallAndCallback() throws Exception { + final Connection connection = connectionRule.getConnection().toConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final RpcController controller = new ServerRpcController(); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final CoprocessorRpcUtils.BlockingRpcCallback callback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + final ConcurrentMap results = new ConcurrentHashMap<>(); + TraceUtil.trace(() -> { + try { + table.coprocessorService(TestProtobufRpcProto.class, null, null, + t -> { + t.echo(controller, request, callback); + return callback.get(); + }, + (region, row, result) -> { + results.put(region, result); + }); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception { + final Connection connection = connectionRule.getConnection().toConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final EchoResponseProto response = TraceUtil.trace(() -> { + try { + final CoprocessorRpcChannel channel = table.coprocessorService(new byte[] {}); + final TestProtobufRpcProto.BlockingInterface service = + TestProtobufRpcProto.newBlockingStub(channel); + return service.echo(null, request); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(response); + assertEquals("hello", response.getMessage()); + } + + final Matcher parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + /* + * This interface is really low level: it returns a Channel and expects the caller to invoke it. + * The Table instance isn't issuing a command here, it's not a table operation, so don't expect + * there to be a span like `COPROC_EXEC table`. + */ + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan)); + assertThat(spans, not(hasItem(tableOpMatcher))); + } + + @Test + public void traceSyncTableBatchEndpoint() throws Exception { + final Connection connection = connectionRule.getConnection().toConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final Descriptors.MethodDescriptor descriptor = + TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final Map response = TraceUtil.trace(() -> { + try { + return table.batchCoprocessorService( + descriptor, request, null, null, EchoResponseProto.getDefaultInstance()); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(response); + assertThat(response.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableBatchEndpointCallback() throws Exception { + final Connection connection = connectionRule.getConnection().toConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final Descriptors.MethodDescriptor descriptor = + TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final ConcurrentMap results = new ConcurrentHashMap<>(); + TraceUtil.trace(() -> { + try { + table.batchCoprocessorService(descriptor, request, null, null, + EchoResponseProto.getDefaultInstance(), (region, row, res) -> results.put(region, res)); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceAsyncAdminEndpoint() throws Exception { + final AsyncConnection connection = connectionRule.getConnection(); + final AsyncAdmin admin = connection.getAdmin(); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final ServiceCaller callback = + (stub, controller, cb) -> stub.echo(controller, request, cb); + + final String response = TraceUtil.tracedFuture( + () -> admin.coprocessorService(TestProtobufRpcProto::newStub, callback), + testName.getMethodName()) + .get() + .getMessage(); + assertEquals("hello", response); + + final Matcher parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher rpcMatcher = allOf( + hasName("hbase.pb.MasterService/ExecMasterService"), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncAdminEndpoint() throws Exception { + final Connection connection = connectionRule.getConnection().toConnection(); + try (final Admin admin = connection.getAdmin()) { + final TestProtobufRpcProto.BlockingInterface service = + TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final String response = TraceUtil.trace(() -> { + try { + return service.echo(null, request).getMessage(); + } catch (ServiceException e) { + throw new RuntimeException(e); + } + }, testName.getMethodName()); + assertEquals("hello", response); + } + + final Matcher parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher rpcMatcher = allOf( + hasName("hbase.pb.MasterService/ExecMasterService"), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + private void waitForAndLog(Matcher spanMatcher) { + final Configuration conf = connectionRule.getConnection().getConfiguration(); + Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( + otelClassRule::getSpans, hasItem(spanMatcher))); + final List spans = otelClassRule.getSpans(); + if (logger.isDebugEnabled()) { + StringTraceRenderer renderer = new StringTraceRenderer(spans); + renderer.render(logger::debug); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java new file mode 100644 index 00000000000..3bbf2d445a8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import org.junit.rules.ExternalResource; + +/** + *

Like {@link OpenTelemetryRule}, except modeled after the junit5 implementation + * {@code OpenTelemetryExtension}. Use this class when you need to make asserts on {@link SpanData} + * created on a MiniCluster. Make sure this rule initialized before the MiniCluster so that it can + * register its instance of {@link OpenTelemetry} as the global instance before any server-side + * component can call {@link TraceUtil#getGlobalTracer()}.

+ *

For example:

+ *
{@code
+ * public class TestMyClass {
+ *   private static final OpenTelemetryClassRule otelClassRule =
+ *     OpenTelemetryClassRule.create();
+ *   private static final MiniClusterRule miniClusterRule =
+ *     MiniClusterRule.newBuilder().build();
+ *   protected static final ConnectionRule connectionRule =
+ *     new ConnectionRule(miniClusterRule::createConnection);
+ *
+ *   @ClassRule
+ *   public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
+ *     .around(miniClusterRule)
+ *     .around(connectionRule);
+ *
+ *   @Rule
+ *   public final OpenTelemetryTestRule otelTestRule =
+ *     new OpenTelemetryTestRule(otelClassRule);
+ *
+ *   @Test
+ *   public void myTest() {
+ *     // ...
+ *     // do something that makes spans
+ *     final List spans = otelClassRule.getSpans();
+ *     // make assertions on them
+ *   }
+ * }
+ * }
+ * + * @see junit5/OpenTelemetryExtension.java + */ +public final class OpenTelemetryClassRule extends ExternalResource { + + public static OpenTelemetryClassRule create() { + InMemorySpanExporter spanExporter = InMemorySpanExporter.create(); + + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(tracerProvider) + .build(); + + return new OpenTelemetryClassRule(openTelemetry, spanExporter); + } + + private final OpenTelemetrySdk openTelemetry; + private final InMemorySpanExporter spanExporter; + + private OpenTelemetryClassRule( + final OpenTelemetrySdk openTelemetry, + final InMemorySpanExporter spanExporter + ) { + this.openTelemetry = openTelemetry; + this.spanExporter = spanExporter; + } + + /** Returns the {@link OpenTelemetry} created by this Rule. */ + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + /** Returns all the exported {@link SpanData} so far. */ + public List getSpans() { + return spanExporter.getFinishedSpanItems(); + } + + /** + * Clears the collected exported {@link SpanData}. + */ + public void clearSpans() { + spanExporter.reset(); + } + + @Override + protected void before() throws Throwable { + GlobalOpenTelemetry.resetForTest(); + GlobalOpenTelemetry.set(openTelemetry); + } + + @Override + protected void after() { + GlobalOpenTelemetry.resetForTest(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java new file mode 100644 index 00000000000..a6b50ffca29 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import org.junit.rules.ExternalResource; + +/** + * Used alongside {@link OpenTelemetryClassRule}. See that class's javadoc for details on when to + * use these classes instead of {@link io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule} and + * an example of how to use these classes together. + */ +public final class OpenTelemetryTestRule extends ExternalResource { + + private final OpenTelemetryClassRule classRuleSupplier; + + public OpenTelemetryTestRule(final OpenTelemetryClassRule classRule) { + this.classRuleSupplier = classRule; + } + + @Override + protected void before() throws Throwable { + classRuleSupplier.clearSpans(); + } +}