HBASE-26531 Trace coprocessor exec endpoints

Trace table ExecService invocations as table operations. Ensure span relationships for both table
and master invocations.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Nick Dimiduk 2022-02-16 14:03:55 +01:00 committed by Nick Dimiduk
parent a49d147d49
commit 36a5f86c50
10 changed files with 1018 additions and 73 deletions

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import io.opentelemetry.context.Context;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
/** /**
@ -280,26 +279,27 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) { CoprocessorCallback<R> callback) {
final Context context = Context.current();
CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
@Override @Override
public void onRegionComplete(RegionInfo region, R resp) { public void onRegionComplete(RegionInfo region, R resp) {
pool.execute(() -> callback.onRegionComplete(region, resp)); pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp)));
} }
@Override @Override
public void onRegionError(RegionInfo region, Throwable error) { public void onRegionError(RegionInfo region, Throwable error) {
pool.execute(() -> callback.onRegionError(region, error)); pool.execute(context.wrap(() -> callback.onRegionError(region, error)));
} }
@Override @Override
public void onComplete() { public void onComplete() {
pool.execute(() -> callback.onComplete()); pool.execute(context.wrap(callback::onComplete));
} }
@Override @Override
public void onError(Throwable error) { public void onError(Throwable error) {
pool.execute(() -> callback.onError(error)); pool.execute(context.wrap(() -> callback.onError(error)));
} }
}; };
CoprocessorServiceBuilder<S, R> builder = CoprocessorServiceBuilder<S, R> builder =

View File

@ -26,8 +26,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMu
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -51,17 +52,16 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.io.netty.util.Timer; import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -755,14 +755,22 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs); region, row, rpcTimeoutNs, operationTimeoutNs);
final Span span = Span.current();
S stub = stubMaker.apply(channel); S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>(); CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
callable.call(stub, controller, resp -> { callable.call(stub, controller, resp -> {
try (Scope ignored = span.makeCurrent()) {
if (controller.failed()) { if (controller.failed()) {
future.completeExceptionally(controller.getFailed()); final Throwable failure = controller.getFailed();
future.completeExceptionally(failure);
TraceUtil.setError(span, failure);
} else { } else {
future.complete(resp); future.complete(resp);
span.setStatus(StatusCode.OK);
}
} finally {
span.end();
} }
}); });
return future; return future;
@ -795,8 +803,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
final Span span = Span.current();
if (error != null) { if (error != null) {
callback.onError(error); callback.onError(error);
TraceUtil.setError(span, error);
span.end();
return; return;
} }
unfinishedRequest.incrementAndGet(); unfinishedRequest.incrementAndGet();
@ -807,10 +818,15 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
addListener( addListener(
conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
operationTimeoutNs), operationTimeoutNs),
(l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, (l, e) -> {
locateFinished, unfinishedRequest, l, e)); try (Scope ignored = span.makeCurrent()) {
onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e);
}
});
} }
addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
try (Scope ignored = span.makeCurrent()) {
if (e != null) { if (e != null) {
callback.onRegionError(region, e); callback.onRegionError(region, e);
} else { } else {
@ -819,6 +835,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
callback.onComplete(); callback.onComplete();
} }
}
}); });
} }
@ -868,10 +885,22 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public void execute() { public void execute() {
addListener(conn.getLocator().getRegionLocation(tableName, startKey, final Span span = newTableOperationSpanBuilder()
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC)
(loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, .build();
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); try (Scope ignored = span.makeCurrent()) {
final RegionLocateType regionLocateType = startKeyInclusive
? RegionLocateType.CURRENT
: RegionLocateType.AFTER;
final CompletableFuture<HRegionLocation> future = conn.getLocator()
.getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
addListener(future, (loc, error) -> {
try (Scope ignored1 = span.makeCurrent()) {
onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error);
}
});
}
} }
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError; import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -30,13 +31,11 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
@ -74,6 +73,7 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request, private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
ClientService.Interface stub) { ClientService.Interface stub) {
final Context context = Context.current();
CompletableFuture<Message> future = new CompletableFuture<>(); CompletableFuture<Message> future = new CompletableFuture<>();
if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) { if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) {
future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " + future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " +
@ -82,11 +82,8 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
} }
CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
request, row, loc.getRegion().getRegionName()); request, row, loc.getRegion().getRegionName());
stub.execService(controller, csr, stub.execService(controller, csr, resp -> {
new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() { try (Scope ignored = context.makeCurrent()) {
@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) { if (controller.failed()) {
future.completeExceptionally(controller.getFailed()); future.completeExceptionally(controller.getFailed());
} else { } else {
@ -105,16 +102,23 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
@Override @Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request, public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) { Message responsePrototype, RpcCallback<Message> done) {
final Context context = Context.current();
addListener( addListener(
conn.callerFactory.<Message> single().table(tableName).row(row) conn.callerFactory.<Message> single().table(tableName).row(row)
.locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(), .action((c, l, s) -> {
try (Scope ignored = context.makeCurrent()) {
return rpcCall(method, request, responsePrototype, c, l, s);
}
}).call(),
(r, e) -> { (r, e) -> {
try (Scope ignored = context.makeCurrent()) {
if (e != null) { if (e != null) {
setCoprocessorError(controller, e); setCoprocessorError(controller, e);
} }
done.run(r); done.run(r);
}
}); });
} }

View File

@ -18,14 +18,15 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError; import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -34,6 +35,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -46,9 +48,12 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -57,7 +62,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans; import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@ -65,7 +69,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/** /**
@ -458,25 +461,22 @@ class TableOverAsyncTable implements Table {
R call(RegionCoprocessorRpcChannel channel) throws Exception; R call(RegionCoprocessorRpcChannel channel) throws Exception;
} }
private <R> void coprocssorService(String serviceName, byte[] startKey, byte[] endKey, private <R> void coprocessorService(String serviceName, byte[] startKey, byte[] endKey,
Callback<R> callback, StubCall<R> call) throws Throwable { Callback<R> callback, StubCall<R> call) throws Throwable {
// get regions covered by the row range // get regions covered by the row range
ExecutorService pool = this.poolSupplier.get(); ExecutorService pool = Context.current().wrap(this.poolSupplier.get());
List<byte[]> keys = getStartKeysInRange(startKey, endKey); List<byte[]> keys = getStartKeysInRange(startKey, endKey);
Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
try { try {
for (byte[] r : keys) { for (byte[] r : keys) {
RegionCoprocessorRpcChannel channel = coprocessorService(r); RegionCoprocessorRpcChannel channel = coprocessorService(r);
Future<R> future = pool.submit(new Callable<R>() { Future<R> future = pool.submit(() -> {
@Override
public R call() throws Exception {
R result = call.call(channel); R result = call.call(channel);
byte[] region = channel.getLastRegion(); byte[] region = channel.getLastRegion();
if (callback != null) { if (callback != null) {
callback.update(region, r, result); callback.update(region, r, result);
} }
return result; return result;
}
}); });
futures.put(r, future); futures.put(r, future);
} }
@ -492,7 +492,7 @@ class TableOverAsyncTable implements Table {
try { try {
e.getValue().get(); e.getValue().get();
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
LOG.warn("Error calling coprocessor service " + serviceName + " for row " + LOG.warn("Error calling coprocessor service {} for row {}", serviceName,
Bytes.toStringBinary(e.getKey()), ee); Bytes.toStringBinary(e.getKey()), ee);
throw ee.getCause(); throw ee.getCause();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -505,10 +505,18 @@ class TableOverAsyncTable implements Table {
@Override @Override
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
byte[] endKey, Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable { byte[] endKey, Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable {
coprocssorService(service.getName(), startKey, endKey, callback, channel -> { final Supplier<Span> supplier = new TableOperationSpanBuilder(conn)
.setTableName(table.getName())
.setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
TraceUtil.trace(() -> {
final Context context = Context.current();
coprocessorService(service.getName(), startKey, endKey, callback, channel -> {
try (Scope ignored = context.makeCurrent()) {
T instance = ProtobufUtil.newServiceStub(service, channel); T instance = ProtobufUtil.newServiceStub(service, channel);
return callable.call(instance); return callable.call(instance);
}
}); });
}, supplier);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -516,9 +524,18 @@ class TableOverAsyncTable implements Table {
public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor, public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback) Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
throws ServiceException, Throwable { throws ServiceException, Throwable {
coprocssorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> { final Supplier<Span> supplier = new TableOperationSpanBuilder(conn)
return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype); .setTableName(table.getName())
.setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
TraceUtil.trace(() -> {
final Context context = Context.current();
coprocessorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> {
try (Scope ignored = context.makeCurrent()) {
return (R) channel.callBlockingMethod(
methodDescriptor, null, request, responsePrototype);
}
}); });
}, supplier);
} }
@Override @Override

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.trace;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Rudimentary tool for visualizing a hierarchy of spans. Given a collection of spans, indexes
* them from parents to children and prints them out one per line, indented.
*/
@InterfaceAudience.Private
public class StringTraceRenderer {
private static final Logger logger = LoggerFactory.getLogger(StringTraceRenderer.class);
private final List<Node> graphs;
public StringTraceRenderer(final Collection<SpanData> spans) {
final Map<String, Node> spansById = indexSpansById(spans);
populateChildren(spansById);
graphs = findRoots(spansById);
}
private static Map<String, Node> indexSpansById(final Collection<SpanData> spans) {
final Map<String, Node> spansById = new HashMap<>(spans.size());
spans.forEach(span -> spansById.put(span.getSpanId(), new Node(span)));
return spansById;
}
private static void populateChildren(final Map<String, Node> spansById) {
spansById.forEach((spanId, node) -> {
final SpanData spanData = node.spanData;
final String parentSpanId = spanData.getParentSpanId();
if (Objects.equals(parentSpanId, SpanId.getInvalid())) {
return;
}
final Node parentNode = spansById.get(parentSpanId);
if (parentNode == null) {
logger.warn("Span {} has parent {} that is not found in index, {}", spanId, parentSpanId,
spanData);
return;
}
parentNode.children.put(spanId, node);
});
}
private static List<Node> findRoots(final Map<String, Node> spansById) {
return spansById.values()
.stream()
.filter(node -> Objects.equals(node.spanData.getParentSpanId(), SpanId.getInvalid()))
.collect(Collectors.toList());
}
public void render(final Consumer<String> writer) {
for (ListIterator<Node> iter = graphs.listIterator(); iter.hasNext(); ) {
final int idx = iter.nextIndex();
final Node node = iter.next();
render(writer, node, 0, idx == 0);
}
}
private static void render(
final Consumer<String> writer,
final Node node,
final int indent,
final boolean isFirst
) {
writer.accept(render(node.spanData, indent, isFirst));
final List<Node> children = new ArrayList<>(node.children.values());
for (ListIterator<Node> iter = children.listIterator(); iter.hasNext(); ) {
final int idx = iter.nextIndex();
final Node child = iter.next();
render(writer, child, indent + 2, idx == 0);
}
}
private static String render(
final SpanData spanData,
final int indent,
final boolean isFirst
) {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < indent; i++) {
sb.append(' ');
}
return sb.append(isFirst ? "└─ " : "├─ ")
.append(render(spanData))
.toString();
}
private static String render(final SpanData spanData) {
return new ToStringBuilder(spanData, ToStringStyle.NO_CLASS_NAME_STYLE)
.append("spanId", spanData.getSpanId())
.append("name", spanData.getName())
.append("hasEnded", spanData.hasEnded())
.toString();
}
private static class Node {
final SpanData spanData;
final LinkedHashMap<String, Node> children;
Node(final SpanData spanData) {
this.spanData = spanData;
this.children = new LinkedHashMap<>();
}
}
}

View File

@ -99,6 +99,24 @@ public final class SpanDataMatchers {
}; };
} }
public static Matcher<SpanData> hasParentSpanId(String parentSpanId) {
return hasParentSpanId(equalTo(parentSpanId));
}
public static Matcher<SpanData> hasParentSpanId(SpanData parent) {
return hasParentSpanId(parent.getSpanId());
}
public static Matcher<SpanData> hasParentSpanId(Matcher<String> matcher) {
return new FeatureMatcher<SpanData, String>(matcher, "SpanKind with a parentSpanId that",
"parentSpanId"
) {
@Override protected String featureValueOf(SpanData item) {
return item.getParentSpanId();
}
};
}
public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) { public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
final Matcher<StatusCode> matcher = is(equalTo(statusCode)); final Matcher<StatusCode> matcher = is(equalTo(statusCode));
return new TypeSafeMatcher<SpanData>() { return new TypeSafeMatcher<SpanData>() {

View File

@ -109,6 +109,12 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId> <artifactId>hbase-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--Some of the CPEPs use hbase server-side internals; they shouldn't!--> <!--Some of the CPEPs use hbase server-side internals; they shouldn't!-->
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
@ -213,6 +219,23 @@
<artifactId>log4j-1.2-api</artifactId> <artifactId>log4j-1.2-api</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- when depending on a test jar, maven does not pull in the transitive dependencies. require
them manually. -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<!-- Skip the tests in this module --> <!-- Skip the tests in this module -->

View File

@ -0,0 +1,549 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConnectionRule;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.MiniClusterRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.ServiceCaller;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExternalResource;
import org.junit.rules.RuleChain;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
/**
* Test cases to verify tracing coprocessor Endpoint execution
*/
@Category({ CoprocessorTests.class, MediumTests.class})
public class TestCoprocessorEndpointTracing {
private static final Logger logger =
LoggerFactory.getLogger(TestCoprocessorEndpointTracing.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorEndpointTracing.class);
private static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
private static final MiniClusterRule miniclusterRule = MiniClusterRule.newBuilder()
.setConfiguration(() -> {
final Configuration conf = new Configuration();
conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
return conf;
})
.build();
private static final ConnectionRule connectionRule =
new ConnectionRule(miniclusterRule::createConnection);
private static final class Setup extends ExternalResource {
@Override
protected void before() throws Throwable {
final HBaseTestingUtil util = miniclusterRule.getTestingUtility();
final AsyncConnection connection = connectionRule.getConnection();
final AsyncAdmin admin = connection.getAdmin();
final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TEST_TABLE)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build();
admin.createTable(tableDescriptor).get();
util.waitUntilAllRegionsAssigned(TEST_TABLE);
}
}
@ClassRule
public static final TestRule testRule = RuleChain.outerRule(otelClassRule)
.around(miniclusterRule)
.around(connectionRule)
.around(new Setup());
private static final TableName TEST_TABLE =
TableName.valueOf(TestCoprocessorEndpointTracing.class.getSimpleName());
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
@Rule
public OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
@Rule
public TestName testName = new TestName();
@Test
public void traceAsyncTableEndpoint() {
final AsyncConnection connection = connectionRule.getConnection();
final AsyncTable<?> table = connection.getTable(TEST_TABLE);
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final CompletableFuture<Map<byte[], String>> future = new CompletableFuture<>();
final AsyncTable.CoprocessorCallback<EchoResponseProto> callback =
new AsyncTable.CoprocessorCallback<EchoResponseProto>() {
final ConcurrentMap<byte[], String> results = new ConcurrentHashMap<>();
@Override
public void onRegionComplete(RegionInfo region, EchoResponseProto resp) {
if (!future.isDone()) {
results.put(region.getRegionName(), resp.getMessage());
}
}
@Override
public void onRegionError(RegionInfo region, Throwable error) {
if (!future.isDone()) {
future.completeExceptionally(error);
}
}
@Override
public void onComplete() {
if (!future.isDone()) {
future.complete(results);
}
}
@Override
public void onError(Throwable error) {
if (!future.isDone()) {
future.completeExceptionally(error);
}
}
};
final Map<byte[], String> results = TraceUtil.trace(() -> {
table.coprocessorService(TestProtobufRpcProto::newStub,
(stub, controller, cb) -> stub.echo(controller, request, cb), callback)
.execute();
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}, testName.getMethodName());
assertNotNull(results);
assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results));
assertThat(results.values(), everyItem(allOf(
notNullValue(),
equalTo("hello"))));
final Matcher<SpanData> parentMatcher = allOf(hasName(testName.getMethodName()), hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> tableOpMatcher = allOf(
hasName(containsString("COPROC_EXEC")),
hasParentSpanId(testSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(tableOpMatcher));
final SpanData tableOpSpan = spans.stream()
.filter(tableOpMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> rpcMatcher = allOf(
hasName("hbase.pb.ClientService/ExecService"),
hasParentSpanId(tableOpSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(rpcMatcher));
}
@Test
public void traceSyncTableEndpointCall() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final RpcController controller = new ServerRpcController();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final CoprocessorRpcUtils.BlockingRpcCallback<EchoResponseProto> callback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
final Map<byte[], EchoResponseProto> results = TraceUtil.trace(() -> {
try {
return table.coprocessorService(TestProtobufRpcProto.class, null, null,
t -> {
t.echo(controller, request, callback);
return callback.get();
});
} catch (Throwable t) {
throw new RuntimeException(t);
}
}, testName.getMethodName());
assertNotNull(results);
assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results));
assertThat(results.values(), everyItem(allOf(
notNullValue(),
hasProperty("message", equalTo("hello")))));
}
final Matcher<SpanData> parentMatcher = allOf(
hasName(testName.getMethodName()),
hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> tableOpMatcher = allOf(
hasName(containsString("COPROC_EXEC")),
hasParentSpanId(testSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(tableOpMatcher));
final SpanData tableOpSpan = spans.stream()
.filter(tableOpMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> rpcMatcher = allOf(
hasName("hbase.pb.ClientService/ExecService"),
hasParentSpanId(tableOpSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(rpcMatcher));
}
@Test
public void traceSyncTableEndpointCallAndCallback() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final RpcController controller = new ServerRpcController();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final CoprocessorRpcUtils.BlockingRpcCallback<EchoResponseProto> callback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
final ConcurrentMap<byte[], EchoResponseProto> results = new ConcurrentHashMap<>();
TraceUtil.trace(() -> {
try {
table.coprocessorService(TestProtobufRpcProto.class, null, null,
t -> {
t.echo(controller, request, callback);
return callback.get();
},
(region, row, result) -> {
results.put(region, result);
});
} catch (Throwable t) {
throw new RuntimeException(t);
}
}, testName.getMethodName());
assertNotNull(results);
assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results));
assertThat(results.values(), everyItem(allOf(
notNullValue(),
hasProperty("message", equalTo("hello")))));
}
final Matcher<SpanData> parentMatcher = allOf(
hasName(testName.getMethodName()),
hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> tableOpMatcher = allOf(
hasName(containsString("COPROC_EXEC")),
hasParentSpanId(testSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(tableOpMatcher));
final SpanData tableOpSpan = spans.stream()
.filter(tableOpMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> rpcMatcher = allOf(
hasName("hbase.pb.ClientService/ExecService"),
hasParentSpanId(tableOpSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(rpcMatcher));
}
@Test
public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final EchoResponseProto response = TraceUtil.trace(() -> {
try {
final CoprocessorRpcChannel channel = table.coprocessorService(new byte[] {});
final TestProtobufRpcProto.BlockingInterface service =
TestProtobufRpcProto.newBlockingStub(channel);
return service.echo(null, request);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}, testName.getMethodName());
assertNotNull(response);
assertEquals("hello", response.getMessage());
}
final Matcher<SpanData> parentMatcher = allOf(
hasName(testName.getMethodName()),
hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
/*
* This interface is really low level: it returns a Channel and expects the caller to invoke it.
* The Table instance isn't issuing a command here, it's not a table operation, so don't expect
* there to be a span like `COPROC_EXEC table`.
*/
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> tableOpMatcher = allOf(
hasName(containsString("COPROC_EXEC")),
hasParentSpanId(testSpan));
assertThat(spans, not(hasItem(tableOpMatcher)));
}
@Test
public void traceSyncTableBatchEndpoint() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final Descriptors.MethodDescriptor descriptor =
TestProtobufRpcProto.getDescriptor().findMethodByName("echo");
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final Map<byte[], EchoResponseProto> response = TraceUtil.trace(() -> {
try {
return table.batchCoprocessorService(
descriptor, request, null, null, EchoResponseProto.getDefaultInstance());
} catch (Throwable t) {
throw new RuntimeException(t);
}
}, testName.getMethodName());
assertNotNull(response);
assertThat(response.values(), everyItem(allOf(
notNullValue(),
hasProperty("message", equalTo("hello")))));
}
final Matcher<SpanData> parentMatcher = allOf(
hasName(testName.getMethodName()),
hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> tableOpMatcher = allOf(
hasName(containsString("COPROC_EXEC")),
hasParentSpanId(testSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(tableOpMatcher));
final SpanData tableOpSpan = spans.stream()
.filter(tableOpMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> rpcMatcher = allOf(
hasName("hbase.pb.ClientService/ExecService"),
hasParentSpanId(tableOpSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(rpcMatcher));
}
@Test
public void traceSyncTableBatchEndpointCallback() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final Descriptors.MethodDescriptor descriptor =
TestProtobufRpcProto.getDescriptor().findMethodByName("echo");
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final ConcurrentMap<byte[], EchoResponseProto> results = new ConcurrentHashMap<>();
TraceUtil.trace(() -> {
try {
table.batchCoprocessorService(descriptor, request, null, null,
EchoResponseProto.getDefaultInstance(), (region, row, res) -> results.put(region, res));
} catch (Throwable t) {
throw new RuntimeException(t);
}
}, testName.getMethodName());
assertNotNull(results);
assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results));
assertThat(results.values(), everyItem(allOf(
notNullValue(),
hasProperty("message", equalTo("hello")))));
}
final Matcher<SpanData> parentMatcher = allOf(
hasName(testName.getMethodName()),
hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> tableOpMatcher = allOf(
hasName(containsString("COPROC_EXEC")),
hasParentSpanId(testSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(tableOpMatcher));
final SpanData tableOpSpan = spans.stream()
.filter(tableOpMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> rpcMatcher = allOf(
hasName("hbase.pb.ClientService/ExecService"),
hasParentSpanId(tableOpSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(rpcMatcher));
}
@Test
public void traceAsyncAdminEndpoint() throws Exception {
final AsyncConnection connection = connectionRule.getConnection();
final AsyncAdmin admin = connection.getAdmin();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final ServiceCaller<TestProtobufRpcProto, EchoResponseProto> callback =
(stub, controller, cb) -> stub.echo(controller, request, cb);
final String response = TraceUtil.tracedFuture(
() -> admin.coprocessorService(TestProtobufRpcProto::newStub, callback),
testName.getMethodName())
.get()
.getMessage();
assertEquals("hello", response);
final Matcher<SpanData> parentMatcher = allOf(
hasName(testName.getMethodName()),
hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> rpcMatcher = allOf(
hasName("hbase.pb.MasterService/ExecMasterService"),
hasParentSpanId(testSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(rpcMatcher));
}
@Test
public void traceSyncAdminEndpoint() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
try (final Admin admin = connection.getAdmin()) {
final TestProtobufRpcProto.BlockingInterface service =
TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final String response = TraceUtil.trace(() -> {
try {
return service.echo(null, request).getMessage();
} catch (ServiceException e) {
throw new RuntimeException(e);
}
}, testName.getMethodName());
assertEquals("hello", response);
}
final Matcher<SpanData> parentMatcher = allOf(
hasName(testName.getMethodName()),
hasEnded());
waitForAndLog(parentMatcher);
final List<SpanData> spans = otelClassRule.getSpans();
final SpanData testSpan = spans.stream()
.filter(parentMatcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> rpcMatcher = allOf(
hasName("hbase.pb.MasterService/ExecMasterService"),
hasParentSpanId(testSpan),
hasStatusWithCode(StatusCode.OK));
assertThat(spans, hasItem(rpcMatcher));
}
private void waitForAndLog(Matcher<SpanData> spanMatcher) {
final Configuration conf = connectionRule.getConnection().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
otelClassRule::getSpans, hasItem(spanMatcher)));
final List<SpanData> spans = otelClassRule.getSpans();
if (logger.isDebugEnabled()) {
StringTraceRenderer renderer = new StringTraceRenderer(spans);
renderer.render(logger::debug);
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import java.util.List;
import org.junit.rules.ExternalResource;
/**
* <p>Like {@link OpenTelemetryRule}, except modeled after the junit5 implementation
* {@code OpenTelemetryExtension}. Use this class when you need to make asserts on {@link SpanData}
* created on a MiniCluster. Make sure this rule initialized before the MiniCluster so that it can
* register its instance of {@link OpenTelemetry} as the global instance before any server-side
* component can call {@link TraceUtil#getGlobalTracer()}.</p>
* <p>For example:</p>
* <pre>{@code
* public class TestMyClass {
* private static final OpenTelemetryClassRule otelClassRule =
* OpenTelemetryClassRule.create();
* private static final MiniClusterRule miniClusterRule =
* MiniClusterRule.newBuilder().build();
* protected static final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
*
* @ClassRule
* public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
* .around(miniClusterRule)
* .around(connectionRule);
*
* @Rule
* public final OpenTelemetryTestRule otelTestRule =
* new OpenTelemetryTestRule(otelClassRule);
*
* @Test
* public void myTest() {
* // ...
* // do something that makes spans
* final List<SpanData> spans = otelClassRule.getSpans();
* // make assertions on them
* }
* }
* }</pre>
*
* @see <a href="https://github.com/open-telemetry/opentelemetry-java/blob/9a330d0/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/junit5/OpenTelemetryExtension.java">junit5/OpenTelemetryExtension.java</a>
*/
public final class OpenTelemetryClassRule extends ExternalResource {
public static OpenTelemetryClassRule create() {
InMemorySpanExporter spanExporter = InMemorySpanExporter.create();
SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();
OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder()
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.setTracerProvider(tracerProvider)
.build();
return new OpenTelemetryClassRule(openTelemetry, spanExporter);
}
private final OpenTelemetrySdk openTelemetry;
private final InMemorySpanExporter spanExporter;
private OpenTelemetryClassRule(
final OpenTelemetrySdk openTelemetry,
final InMemorySpanExporter spanExporter
) {
this.openTelemetry = openTelemetry;
this.spanExporter = spanExporter;
}
/** Returns the {@link OpenTelemetry} created by this Rule. */
public OpenTelemetry getOpenTelemetry() {
return openTelemetry;
}
/** Returns all the exported {@link SpanData} so far. */
public List<SpanData> getSpans() {
return spanExporter.getFinishedSpanItems();
}
/**
* Clears the collected exported {@link SpanData}.
*/
public void clearSpans() {
spanExporter.reset();
}
@Override
protected void before() throws Throwable {
GlobalOpenTelemetry.resetForTest();
GlobalOpenTelemetry.set(openTelemetry);
}
@Override
protected void after() {
GlobalOpenTelemetry.resetForTest();
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.trace;
import org.junit.rules.ExternalResource;
/**
* Used alongside {@link OpenTelemetryClassRule}. See that class's javadoc for details on when to
* use these classes instead of {@link io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule} and
* an example of how to use these classes together.
*/
public final class OpenTelemetryTestRule extends ExternalResource {
private final OpenTelemetryClassRule classRuleSupplier;
public OpenTelemetryTestRule(final OpenTelemetryClassRule classRule) {
this.classRuleSupplier = classRule;
}
@Override
protected void before() throws Throwable {
classRuleSupplier.clearSpans();
}
}