HBASE-26545 Implement tracing of scan

* on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations.
* the span of the `SCAN` table operation should have children representing all the RPC calls
  involved in servicing the scan.
* when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted
  from the callback methods should also be tied to the span that represents the `SCAN` table
  operation. This is easily done because these callbacks are executed on the RPC thread.
* when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the
  callback methods should be also be tied to the span that represents the `SCAN` table
  operation. This accomplished by carefully passing the span instance around after it is created.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Nick Dimiduk 2022-01-20 12:39:20 -08:00 committed by Nick Dimiduk
parent 4f491fd5e4
commit 620e5c6d0a
20 changed files with 1030 additions and 231 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -27,7 +27,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetri
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -35,11 +37,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
@ -85,6 +87,8 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;
private final Span span;
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
@ -112,6 +116,21 @@ class AsyncClientScanner {
} else {
this.scanMetrics = null;
}
/*
* Assumes that the `start()` method is called immediately after construction. If this is no
* longer the case, for tracing correctness, we should move the start of the span into the
* `start()` method. The cost of doing so would be making access to the `span` safe for
* concurrent threads.
*/
span = new TableOperationSpanBuilder(conn)
.setTableName(tableName)
.setOperation(scan)
.build();
if (consumer instanceof AsyncTableResultScanner) {
AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
scanner.setSpan(span);
}
}
private static final class OpenScannerResponse {
@ -140,26 +159,35 @@ class AsyncClientScanner {
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
try (Scope ignored = span.makeCurrent()) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(
loc.getRegion().getRegionName(), scan, scan.getCaching(), false);
stub.scan(controller, request, resp -> {
try (Scope ignored1 = span.makeCurrent()) {
if (controller.failed()) {
final IOException e = controller.getFailed();
future.completeExceptionally(e);
TraceUtil.setError(span, e);
span.end();
return;
}
future.complete(new OpenScannerResponse(
loc, isRegionServerRemote, stub, controller, resp));
}
});
} catch (IOException e) {
// span is closed by listener attached to the Future in `openScanner()`
future.completeExceptionally(e);
}
return future;
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
return;
}
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
private void startScan(OpenScannerResponse resp) {
@ -173,26 +201,40 @@ class AsyncClientScanner {
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
if (hasMore) {
openScanner();
} else {
try {
consumer.onComplete();
} finally {
span.setStatus(StatusCode.OK);
span.end();
}
}
}
});
}
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
try (Scope ignored = span.makeCurrent()) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}
}
private long getPrimaryTimeoutNs() {
@ -206,15 +248,24 @@ class AsyncClientScanner {
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
startScan(resp);
}
startScan(resp);
});
}
public void start() {
openScanner();
try (Scope ignored = span.makeCurrent()) {
openScanner();
}
}
}

View File

@ -27,7 +27,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -50,11 +51,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -573,7 +572,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
final Context context = Context.current();
stub.scan(controller, req, resp -> {
try (Scope ignored = context.makeCurrent()) {
onComplete(controller, resp);
}
});
}
private void next() {

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -231,22 +233,29 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
Span span = null;
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
span = scanner.getSpan();
try (Scope ignored = span.makeCurrent()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null; ) {
if (!consumer.onNext(result)) {
break;
}
}
consumer.onComplete();
}
consumer.onComplete();
} catch (IOException e) {
consumer.onError(e);
try (Scope ignored = span.makeCurrent()) {
consumer.onError(e);
}
}
}
@Override
public void scan(Scan scan, ScanResultConsumer consumer) {
pool.execute(() -> scan0(scan, consumer));
final Context context = Context.current();
pool.execute(context.wrap(() -> scan0(scan, consumer)));
}
@Override

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
@ -58,6 +58,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
private ScanResumer resumer;
// Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
private Span span = null;
public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
this.tableName = tableName;
this.maxCacheSize = maxCacheSize;
@ -79,6 +82,14 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
resumer = controller.suspend();
}
Span getSpan() {
return span;
}
void setSpan(final Span span) {
this.span = span;
}
@Override
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;

View File

@ -640,30 +640,26 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(scan);
return tracedFuture(() -> {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}
@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}, supplier);
@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}
@Override

View File

@ -28,6 +28,8 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildT
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.fail;
@ -44,8 +46,10 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -106,7 +110,7 @@ public class TestAsyncTableTracing {
private AsyncConnectionImpl conn;
private AsyncTable<?> table;
private AsyncTable<ScanResultConsumer> table;
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
@ -452,6 +456,53 @@ public class TestAsyncTableTracing {
assertTrace("SCAN");
}
@Test
public void testScan() throws Throwable {
final CountDownLatch doneSignal = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Throwable> throwable = new AtomicReference<>();
final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
table.scan(scan, new ScanResultConsumer() {
@Override public boolean onNext(Result result) {
if (result.getRow() != null) {
count.incrementAndGet();
}
return true;
}
@Override public void onError(Throwable error) {
throwable.set(error);
doneSignal.countDown();
}
@Override public void onComplete() {
doneSignal.countDown();
}
});
doneSignal.await();
if (throwable.get() != null) {
throw throwable.get();
}
assertThat("user code did not run. check test setup.", count.get(), greaterThan(0));
assertTrace("SCAN");
}
@Test
public void testGetScanner() {
final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
for (Result result : scanner) {
if (result.getRow() != null) {
count++;
}
}
// do something with it.
assertThat(count, greaterThanOrEqualTo(0));
}
assertTrace("SCAN");
}
@Test
public void testExistsList() {
CompletableFuture

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client.trace.hamcrest;
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import io.opentelemetry.api.common.Attributes;
@ -25,7 +26,9 @@ import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.time.Duration;
import java.util.Objects;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
@ -78,6 +81,24 @@ public final class SpanDataMatchers {
};
}
public static Matcher<SpanData> hasExceptionWithType(Matcher<? super String> matcher) {
return hasException(containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), matcher));
}
public static Matcher<SpanData> hasException(Matcher<? super Attributes> matcher) {
return new FeatureMatcher<SpanData, Attributes>(matcher,
"SpanData having Exception with Attributes that", "exception attributes") {
@Override protected Attributes featureValueOf(SpanData actual) {
return actual.getEvents()
.stream()
.filter(e -> Objects.equals(SemanticAttributes.EXCEPTION_EVENT_NAME, e.getName()))
.map(EventData::getAttributes)
.findFirst()
.orElse(null);
}
};
}
public static Matcher<SpanData> hasKind(SpanKind kind) {
return new FeatureMatcher<SpanData, SpanKind>(
equalTo(kind), "SpanData with kind that", "SpanKind") {

View File

@ -264,6 +264,11 @@ public final class StartTestingClusterOption {
return this;
}
public Builder numWorkers(int numWorkers) {
return numDataNodes(numWorkers)
.numRegionServers(numWorkers);
}
public Builder createRootDir(boolean createRootDir) {
this.createRootDir = createRootDir;
return this;

View File

@ -17,29 +17,95 @@
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConnectionRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.MiniClusterRule;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.rules.RuleChain;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
public abstract class AbstractTestAsyncTableScan {
protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
.setMiniClusterOption(StartTestingClusterOption.builder()
.numWorkers(3)
.build())
.build();
protected static final ConnectionRule connectionRule =
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private static final class Setup extends ExternalResource {
@Override
protected void before() throws Throwable {
final HBaseTestingUtil testingUtil = miniClusterRule.getTestingUtility();
final AsyncConnection conn = connectionRule.getAsyncConnection();
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
testingUtil.waitTableAvailable(TABLE_NAME);
conn.getTable(TABLE_NAME)
.putAll(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i))
.addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
.collect(Collectors.toList()))
.get();
}
}
@ClassRule
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
.around(miniClusterRule)
.around(connectionRule)
.around(new Setup());
@Rule
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
@Rule
public final TestName testName = new TestName();
protected static TableName TABLE_NAME = TableName.valueOf("async");
@ -51,53 +117,29 @@ public abstract class AbstractTestAsyncTableScan {
protected static int COUNT = 1000;
protected static AsyncConnection ASYNC_CONN;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3);
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
ASYNC_CONN.getTable(TABLE_NAME).putAll(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
.collect(Collectors.toList())).get();
}
@AfterClass
public static void tearDown() throws Exception {
ASYNC_CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
protected static Scan createNormalScan() {
private static Scan createNormalScan() {
return new Scan();
}
protected static Scan createBatchScan() {
private static Scan createBatchScan() {
return new Scan().setBatch(1);
}
// set a small result size for testing flow control
protected static Scan createSmallResultSizeScan() {
private static Scan createSmallResultSizeScan() {
return new Scan().setMaxResultSize(1);
}
protected static Scan createBatchSmallResultSizeScan() {
private static Scan createBatchSmallResultSizeScan() {
return new Scan().setBatch(1).setMaxResultSize(1);
}
protected static AsyncTable<?> getRawTable() {
return ASYNC_CONN.getTable(TABLE_NAME);
private static AsyncTable<?> getRawTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
}
protected static AsyncTable<?> getTable() {
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
private static AsyncTable<?> getTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
}
private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
@ -131,8 +173,18 @@ public abstract class AbstractTestAsyncTableScan {
protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;
/**
* Used by implementation classes to assert the correctness of spans produced under test.
*/
protected abstract void assertTraceContinuity();
/**
* Used by implementation classes to assert the correctness of spans having errors.
*/
protected abstract void assertTraceError(final Matcher<String> exceptionTypeNameMatcher);
protected final List<Result> convertFromBatchResult(List<Result> results) {
assertTrue(results.size() % 2 == 0);
assertEquals(0, results.size() % 2);
return IntStream.range(0, results.size() / 2).mapToObj(i -> {
try {
return Result
@ -143,16 +195,25 @@ public abstract class AbstractTestAsyncTableScan {
}).collect(Collectors.toList());
}
protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
}
@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.forEach(
rs -> assertEquals(
"The scanner count of " + rs.getServerName() + " is " +
rs.getRSRpcServices().getScannersCount(),
0, rs.getRSRpcServices().getScannersCount()));
miniClusterRule.getTestingUtility()
.getHBaseCluster()
.getRegionServerThreads()
.stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer)
.forEach(rs -> assertEquals(
"The scanner count of " + rs.getServerName() + " is " +
rs.getRSRpcServices().getScannersCount(),
0, rs.getRSRpcServices().getScannersCount()));
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> {
Result result = results.get(i);
@ -169,37 +230,55 @@ public abstract class AbstractTestAsyncTableScan {
@Test
public void testReversedScanAll() throws Exception {
List<Result> results = doScan(createScan().setReversed(true), -1);
List<Result> results = TraceUtil.trace(
() -> doScan(createScan().setReversed(true), -1), testName.getMethodName());
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
assertTraceContinuity();
}
@Test
public void testScanNoStopKey() throws Exception {
int start = 345;
List<Result> results =
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1);
List<Result> results = TraceUtil.trace(() ->
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1),
testName.getMethodName());
assertEquals(COUNT - start, results.size());
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
assertTraceContinuity();
}
@Test
public void testReverseScanNoStopKey() throws Exception {
int start = 765;
List<Result> results = doScan(
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1);
final Scan scan = createScan()
.withStartRow(Bytes.toBytes(String.format("%03d", start)))
.setReversed(true);
List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName());
assertEquals(start + 1, results.size());
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
assertTraceContinuity();
}
@Test
public void testScanWrongColumnFamily() throws Exception {
try {
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1);
} catch (Exception e) {
assertTrue(e instanceof NoSuchColumnFamilyException ||
e.getCause() instanceof NoSuchColumnFamilyException);
public void testScanWrongColumnFamily() {
final Exception e = assertThrows(Exception.class, () -> TraceUtil.trace(
() -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1),
testName.getMethodName()));
// hamcrest generic enforcement for `anyOf` is a pain; skip it
// but -- don't we always unwrap ExecutionExceptions -- bug?
if (e instanceof NoSuchColumnFamilyException) {
final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e;
assertThat(ex, isA(NoSuchColumnFamilyException.class));
} else if (e instanceof ExecutionException) {
final ExecutionException ex = (ExecutionException) e;
assertThat(ex, allOf(
isA(ExecutionException.class),
hasProperty("cause", isA(NoSuchColumnFamilyException.class))));
} else {
fail("Found unexpected Exception " + e);
}
assertTraceError(endsWith(NoSuchColumnFamilyException.class.getName()));
}
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
/**
* Advise the scanning infrastructure to collect up to {@code limit} results.
*/
class LimitedScanResultConsumer extends SimpleScanResultConsumerImpl {
private final int limit;
public LimitedScanResultConsumer(int limit) {
this.limit = limit;
}
@Override
public synchronized boolean onNext(Result result) {
return super.onNext(result) && results.size() < limit;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,59 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
final class SimpleScanResultConsumer implements ScanResultConsumer {
/**
* A simplistic {@link ScanResultConsumer} for use in tests.
*/
public interface SimpleScanResultConsumer extends ScanResultConsumer {
private ScanMetrics scanMetrics;
List<Result> getAll() throws Exception;
private final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return true;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
ScanMetrics getScanMetrics();
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
class SimpleScanResultConsumerImpl implements SimpleScanResultConsumer {
private ScanMetrics scanMetrics;
protected final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return true;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
@Override
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
}

View File

@ -17,24 +17,40 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScan.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -58,8 +74,8 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<ScanResultConsumer> table =
ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
AsyncTable<ScanResultConsumer> table = connectionRule.getAsyncConnection()
.getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results;
if (closeAfter > 0) {
// these tests batch settings with the sample data result in each result being
@ -68,11 +84,13 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
if (scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter);
TracedScanResultConsumer consumer =
new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter));
table.scan(scan, consumer);
results = consumer.getAll();
} else {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
TracedScanResultConsumer consumer =
new TracedScanResultConsumer(new SimpleScanResultConsumerImpl());
table.scan(scan, consumer);
results = consumer.getAll();
}
@ -82,49 +100,115 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
return results;
}
private static class LimitedScanResultConsumer implements ScanResultConsumer {
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(
hasName(parentSpanName),
hasStatusWithCode(StatusCode.OK),
hasEnded());
waitForSpan(parentSpanMatcher);
private final int limit;
public LimitedScanResultConsumer(int limit) {
this.limit = limit;
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
private final List<Result> results = new ArrayList<>();
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
private Throwable error;
final Matcher<SpanData> scanOperationSpanMatcher = allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream()
.filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
private boolean finished = false;
final Matcher<SpanData> onScanMetricsCreatedMatcher =
hasName("TracedScanResultConsumer#onScanMetricsCreated");
assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
spans.stream()
.filter(onScanMetricsCreatedMatcher::matches)
.forEach(span -> assertThat(span, allOf(
onScanMetricsCreatedMatcher,
hasParentSpanId(scanOperationSpanId),
hasEnded())));
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return results.size() < limit;
}
final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
assertThat(spans, hasItem(onNextMatcher));
spans.stream()
.filter(onNextMatcher::matches)
.forEach(span -> assertThat(span, allOf(
onNextMatcher,
hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded())));
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream()
.filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(
onCompleteMatcher,
hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded())));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher = allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher),
hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream()
.filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
assertThat(spans, hasItem(onErrorMatcher));
spans.stream()
.filter(onErrorMatcher::matches)
.forEach(span -> assertThat(span, allOf(
onErrorMatcher,
hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded())));
}
}

View File

@ -17,21 +17,39 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanAll.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -72,4 +90,66 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
}
return results;
}
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(
hasName(parentSpanName),
hasStatusWithCode(StatusCode.OK),
hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher = allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher = allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher),
hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
}
}

View File

@ -123,7 +123,7 @@ public class TestAsyncTableScanMetrics {
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
throws Exception {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl();
CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
}

View File

@ -17,23 +17,41 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanner.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -63,7 +81,8 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
AsyncTable<?> table = connectionRule.getAsyncConnection()
.getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
@ -84,4 +103,65 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
}
return results;
}
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(
hasName(parentSpanName),
hasStatusWithCode(StatusCode.OK),
hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
assertThat(spans, hasItem(allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded())));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher = allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher),
hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
}
}

View File

@ -17,22 +17,41 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -56,8 +75,8 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer();
ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer);
TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer();
connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer);
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
@ -76,4 +95,113 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
}
return results;
}
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(
hasName(parentSpanName),
hasStatusWithCode(StatusCode.OK),
hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher = allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream()
.filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
// RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug?
final Matcher<SpanData> onScanMetricsCreatedMatcher =
hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated");
assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher)));
final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext");
assertThat(spans, hasItem(onNextMatcher));
spans.stream()
.filter(onNextMatcher::matches)
.forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId)));
assertThat(spans, hasItem(allOf(
onNextMatcher,
hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded())));
final Matcher<SpanData> onCompleteMatcher =
hasName("TracedAdvancedScanResultConsumer#onComplete");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream()
.filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(
onCompleteMatcher,
hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded())));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans = otelClassRule.getSpans()
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream()
.filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher = allOf(
hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId),
hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher),
hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream()
.filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId)
.findAny()
.orElseThrow(AssertionError::new);
final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream()
.filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(
onCompleteMatcher,
hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK),
hasEnded())));
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.trace.TraceUtil;
/**
* A drop-in replacement for {@link BufferingScanResultConsumer} that adds tracing spans to its
* implementation of the {@link AdvancedScanResultConsumer} API.
*/
public class TracedAdvancedScanResultConsumer implements AdvancedScanResultConsumer {
private final BufferingScanResultConsumer delegate = new BufferingScanResultConsumer();
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
TraceUtil.trace(
() -> delegate.onScanMetricsCreated(scanMetrics),
"TracedAdvancedScanResultConsumer#onScanMetricsCreated");
}
@Override
public void onNext(Result[] results, ScanController controller) {
TraceUtil.trace(
() -> delegate.onNext(results, controller),
"TracedAdvancedScanResultConsumer#onNext");
}
@Override
public void onError(Throwable error) {
TraceUtil.trace(
() -> delegate.onError(error),
"TracedAdvancedScanResultConsumer#onError");
}
@Override
public void onComplete() {
TraceUtil.trace(delegate::onComplete, "TracedAdvancedScanResultConsumer#onComplete");
}
public Result take() throws IOException, InterruptedException {
return delegate.take();
}
public ScanMetrics getScanMetrics() {
return delegate.getScanMetrics();
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.trace.TraceUtil;
/**
* A wrapper over {@link SimpleScanResultConsumer} that adds tracing of spans to its
* implementation.
*/
class TracedScanResultConsumer implements SimpleScanResultConsumer {
private final SimpleScanResultConsumer delegate;
public TracedScanResultConsumer(final SimpleScanResultConsumer delegate) {
this.delegate = delegate;
}
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
TraceUtil.trace(
() -> delegate.onScanMetricsCreated(scanMetrics),
"TracedScanResultConsumer#onScanMetricsCreated");
}
@Override
public boolean onNext(Result result) {
return TraceUtil.trace(() -> delegate.onNext(result),
"TracedScanResultConsumer#onNext");
}
@Override
public void onError(Throwable error) {
TraceUtil.trace(() -> delegate.onError(error), "TracedScanResultConsumer#onError");
}
@Override
public void onComplete() {
TraceUtil.trace(delegate::onComplete, "TracedScanResultConsumer#onComplete");
}
@Override
public List<Result> getAll() throws Exception {
return delegate.getAll();
}
@Override
public ScanMetrics getScanMetrics() {
return delegate.getScanMetrics();
}
}

View File

@ -1881,12 +1881,13 @@
-Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
-Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true
-Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced
-Dio.netty.eventLoopThreads=3
-Dio.netty.eventLoopThreads=3 -Dio.opentelemetry.context.enableStrictContext=true
</hbase-surefire.argLine>
<hbase-surefire.cygwin-argLine>-enableassertions -Xmx${surefire.cygwinXmx}
-Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
"-Djava.library.path=${hadoop.library.path};${java.library.path}"
-Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced
-Dio.opentelemetry.context.enableStrictContext=true
</hbase-surefire.cygwin-argLine>
<!-- Surefire argLine defaults to Linux, cygwin argLine is used in the os.windows profile -->
<argLine>${hbase-surefire.argLine}</argLine>