HBASE-26545 Implement tracing of scan

* on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations.
* the span of the `SCAN` table operation should have children representing all the RPC calls
  involved in servicing the scan.
* when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted
  from the callback methods should also be tied to the span that represents the `SCAN` table
  operation. This is easily done because these callbacks are executed on the RPC thread.
* when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the
  callback methods should be also be tied to the span that represents the `SCAN` table
  operation. This accomplished by carefully passing the span instance around after it is created.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Nick Dimiduk 2022-03-31 17:29:51 +02:00 committed by Nick Dimiduk
parent 69ea6f579f
commit 235308d8bf
26 changed files with 2295 additions and 1430 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -28,6 +28,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -35,7 +38,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
@ -85,6 +90,8 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;
private final Span span;
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
@ -112,6 +119,18 @@ class AsyncClientScanner {
} else {
this.scanMetrics = null;
}
/*
* Assumes that the `start()` method is called immediately after construction. If this is no
* longer the case, for tracing correctness, we should move the start of the span into the
* `start()` method. The cost of doing so would be making access to the `span` safe for
* concurrent threads.
*/
span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build();
if (consumer instanceof AsyncTableResultScanner) {
AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
scanner.setSpan(span);
}
}
private static final class OpenScannerResponse {
@ -140,64 +159,87 @@ class AsyncClientScanner {
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
try (Scope ignored = span.makeCurrent()) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(),
scan, scan.getCaching(), false);
stub.scan(controller, request, resp -> {
try (Scope ignored1 = span.makeCurrent()) {
if (controller.failed()) {
final IOException e = controller.getFailed();
future.completeExceptionally(e);
TraceUtil.setError(span, e);
span.end();
return;
}
future.complete(
new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
}
});
} catch (IOException e) {
// span is closed by listener attached to the Future in `openScanner()`
future.completeExceptionally(e);
}
return future;
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
return;
}
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
private void startScan(OpenScannerResponse resp) {
addListener(
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
if (hasMore) {
openScanner();
} else {
try {
consumer.onComplete();
} finally {
span.setStatus(StatusCode.OK);
span.end();
}
}
}
});
}
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
try (Scope ignored = span.makeCurrent()) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.action(this::callOpenScanner).call();
}
}
private long getPrimaryTimeoutNs() {
return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs()
: conn.connConf.getPrimaryScanTimeoutNs();
: conn.connConf.getPrimaryScanTimeoutNs();
}
private void openScanner() {
@ -206,15 +248,24 @@ class AsyncClientScanner {
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
startScan(resp);
}
startScan(resp);
});
}
public void start() {
openScanner();
try (Scope ignored = span.makeCurrent()) {
openScanner();
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -28,6 +28,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -170,8 +172,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void preCheck() {
Preconditions.checkState(Thread.currentThread() == callerThread,
"The current thread is %s, expected thread is %s, " +
"you should not call this method outside onNext or onHeartbeat",
"The current thread is %s, expected thread is %s, "
+ "you should not call this method outside onNext or onHeartbeat",
Thread.currentThread(), callerThread);
Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
"Invalid Stopper state %s", state);
@ -201,7 +203,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
@Override
public Optional<Cursor> cursor() {
return cursor;
return cursor;
}
}
@ -352,9 +354,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
" for " + loc.getRegion().getEncodedName() + " of " +
loc.getRegion().getTable() + " failed, ignore, probably already closed",
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
+ " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
+ " failed, ignore, probably already closed",
controller.getFailed());
}
});
@ -392,19 +394,19 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void onError(Throwable error) {
error = translateException(error);
if (tries > startLogErrorsCnt) {
LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() +
" failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
" ms",
LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
+ loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
+ " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
+ " ms",
error);
}
boolean scannerClosed =
error instanceof UnknownScannerException || error instanceof NotServingRegionException ||
error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
boolean scannerClosed = error instanceof UnknownScannerException
|| error instanceof NotServingRegionException
|| error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (tries >= maxAttempts) {
completeExceptionally(!scannerClosed);
@ -573,7 +575,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
final Context context = Context.current();
stub.scan(controller, req, resp -> {
try (Scope ignored = context.makeCurrent()) {
onComplete(controller, resp);
}
});
}
private void next() {

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import com.google.protobuf.RpcChannel;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -177,8 +180,7 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilder() {
private final CheckAndMutateWithFilterBuilder builder =
rawTable.checkAndMutate(row, filter);
private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter);
@Override
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
@ -209,10 +211,9 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
@Override
public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(
List<CheckAndMutate> checkAndMutates) {
return rawTable.checkAndMutate(checkAndMutates).stream()
.map(this::wrap).collect(toList());
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList());
}
@Override
@ -231,22 +232,29 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
Span span = null;
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
span = scanner.getSpan();
try (Scope ignored = span.makeCurrent()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
}
}
consumer.onComplete();
}
consumer.onComplete();
} catch (IOException e) {
consumer.onError(e);
try (Scope ignored = span.makeCurrent()) {
consumer.onError(e);
}
}
}
@Override
public void scan(Scan scan, ScanResultConsumer consumer) {
pool.execute(() -> scan0(scan, consumer));
final Context context = Context.current();
pool.execute(context.wrap(() -> scan0(scan, consumer)));
}
@Override
@ -303,7 +311,7 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
};
CoprocessorServiceBuilder<S, R> builder =
rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
return new CoprocessorServiceBuilder<S, R>() {
@Override

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
@ -58,6 +59,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
private ScanResumer resumer;
// Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
private Span span = null;
public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
this.tableName = tableName;
this.maxCacheSize = maxCacheSize;
@ -71,14 +75,22 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
private void stopPrefetch(ScanController controller) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} stop prefetching when scanning {} as the cache size {}" +
" is greater than the maxCacheSize {}",
String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize,
maxCacheSize);
LOG.debug(
"{} stop prefetching when scanning {} as the cache size {}"
+ " is greater than the maxCacheSize {}",
String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize, maxCacheSize);
}
resumer = controller.suspend();
}
Span getSpan() {
return span;
}
void setSpan(final Span span) {
this.span = span;
}
@Override
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Queue;
@ -36,14 +39,13 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
/**
* ClientAsyncPrefetchScanner implements async scanner behaviour.
* Specifically, the cache used by this scanner is a concurrent queue which allows both
* the producer (hbase client) and consumer (application) to access the queue in parallel.
* The number of rows returned in a prefetch is defined by the caching factor and the result size
* factor.
* This class allocates a buffer cache, whose size is a function of both factors.
* The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty.
* This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
* ClientAsyncPrefetchScanner implements async scanner behaviour. Specifically, the cache used by
* this scanner is a concurrent queue which allows both the producer (hbase client) and consumer
* (application) to access the queue in parallel. The number of rows returned in a prefetch is
* defined by the caching factor and the result size factor. This class allocates a buffer cache,
* whose size is a function of both factors. The prefetch is invoked when the cache is half-filled,
* instead of waiting for it to be empty. This is defined in the method
* {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
*/
@InterfaceAudience.Private
public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@ -66,7 +68,9 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
exceptionsQueue = new ConcurrentLinkedQueue<>();
Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher");
final Context context = Context.current();
final Runnable runnable = context.wrap(new PrefetchRunnable());
Threads.setDaemonThreadRunning(new Thread(runnable), name + ".asyncPrefetcher");
}
void setPrefetchListener(Consumer<Boolean> prefetchListener) {
@ -88,7 +92,7 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@Override
public Result next() throws IOException {
try {
try (Scope ignored = span.makeCurrent()) {
lock.lock();
while (cache.isEmpty()) {
handleException();
@ -98,6 +102,7 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
try {
notEmpty.await();
} catch (InterruptedException e) {
span.recordException(e);
throw new InterruptedIOException("Interrupted when wait to load cache");
}
}
@ -132,8 +137,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
}
private void handleException() throws IOException {
//The prefetch task running in the background puts any exception it
//catches into this exception queue.
// The prefetch task running in the background puts any exception it
// catches into this exception queue.
// Rethrow the exception so the application can handle it.
while (!exceptionsQueue.isEmpty()) {
Exception first = exceptionsQueue.peek();
@ -171,6 +176,7 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
succeed = true;
} catch (Exception e) {
exceptionsQueue.add(e);
span.recordException(e);
} finally {
notEmpty.signalAll();
lock.unlock();
@ -180,7 +186,6 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
}
}
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -21,6 +21,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
@ -40,13 +43,14 @@ import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.LeaseException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* Implements the scanner interface for the HBase client. If there are multiple regions in a table,
* this scanner will iterate through them all.
@ -76,6 +80,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected RpcRetryingCaller<Result[]> caller;
protected RpcControllerFactory rpcControllerFactory;
protected Configuration conf;
protected final Span span;
// The timeout on the primary. Applicable if there are multiple replicas for a region
// In that case, we will only wait for this much timeout on the primary before going
// to the replicas and trying the same scan. Note that the retries will still happen
@ -92,7 +97,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to scan
* @param connection Connection identifying the cluster
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
@ -117,7 +121,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
}
this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
// check if application wants to collect scan metrics
initScanMetrics(scan);
@ -134,14 +138,14 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.rpcControllerFactory = controllerFactory;
this.conf = conf;
this.span = Span.current();
this.scanResultCache = createScanResultCache(scan);
initCache();
}
protected final int getScanReplicaId() {
return scan.getReplicaId() >= RegionReplicaUtil.DEFAULT_REPLICA_ID ? scan.getReplicaId() :
RegionReplicaUtil.DEFAULT_REPLICA_ID;
return Math.max(scan.getReplicaId(), RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
protected ClusterConnection getConnection() {
@ -238,8 +242,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (LOG.isDebugEnabled() && this.currentRegion != null) {
// Only worth logging if NOT first region in scan.
LOG.debug(
"Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) +
"', " + (scan.includeStartRow() ? "inclusive" : "exclusive"));
"Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow())
+ "', " + (scan.includeStartRow() ? "inclusive" : "exclusive"));
}
// clear the current region, we will set a new value to it after the first call of the new
// callable.
@ -331,8 +335,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
// old time we always return empty result for a open scanner operation so we add a check here to
// keep compatible with the old logic. Should remove the isOpenScanner in the future.
// 2. Server tells us that it has no more results for this region.
return (values.length == 0 && !callable.isHeartbeatMessage()) ||
callable.moreResultsInRegion() == MoreResults.NO;
return (values.length == 0 && !callable.isHeartbeatMessage())
|| callable.moreResultsInRegion() == MoreResults.NO;
}
private void closeScannerIfExhausted(boolean exhausted) throws IOException {
@ -362,10 +366,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
// If exception is any but the list below throw it back to the client; else setup
// the scanner and retry.
Throwable cause = e.getCause();
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException ||
e instanceof ScannerResetException || e instanceof LeaseException) {
if ((cause != null && cause instanceof NotServingRegionException)
|| (cause != null && cause instanceof RegionServerStoppedException)
|| e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException
|| e instanceof ScannerResetException || e instanceof LeaseException) {
// Pass. It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
if (retriesLeft <= 0) {
@ -489,8 +493,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
// processing of the scan is taking a long time server side. Rather than continue to
// loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
// unnecesary delays to the caller
LOG.trace("Heartbeat message received and cache contains Results. " +
"Breaking out of scan loop");
LOG.trace("Heartbeat message received and cache contains Results. "
+ "Breaking out of scan loop");
// we know that the region has not been exhausted yet so just break without calling
// closeScannerIfExhausted
break;
@ -546,40 +550,52 @@ public abstract class ClientScanner extends AbstractClientScanner {
@Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();
if (callable != null) {
callable.setClose();
try {
call(callable, caller, scannerTimeout, false);
} catch (UnknownScannerException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to
// throw exceptions. Chances are it was just due to lease time out.
LOG.debug("scanner failed to close", e);
} catch (IOException e) {
/* An exception other than UnknownScanner is unexpected. */
LOG.warn("scanner failed to close.", e);
try (Scope ignored = span.makeCurrent()) {
if (!scanMetricsPublished) {
writeScanMetrics();
}
callable = null;
if (callable != null) {
callable.setClose();
try {
call(callable, caller, scannerTimeout, false);
} catch (UnknownScannerException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to
// throw exceptions. Chances are it was just due to lease time out.
LOG.debug("scanner failed to close", e);
} catch (IOException e) {
/* An exception other than UnknownScanner is unexpected. */
LOG.warn("scanner failed to close.", e);
span.recordException(e);
span.setStatus(StatusCode.ERROR);
}
callable = null;
}
closed = true;
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
closed = true;
}
@Override
public boolean renewLease() {
if (callable == null) {
return false;
}
// do not return any rows, do not advance the scanner
callable.setRenew(true);
try {
this.caller.callWithoutRetries(callable, this.scannerTimeout);
return true;
} catch (Exception e) {
LOG.debug("scanner failed to renew lease", e);
return false;
} finally {
callable.setRenew(false);
try (Scope ignored = span.makeCurrent()) {
if (callable == null) {
return false;
}
// do not return any rows, do not advance the scanner
callable.setRenew(true);
try {
this.caller.callWithoutRetries(callable, this.scannerTimeout);
return true;
} catch (Exception e) {
LOG.debug("scanner failed to renew lease", e);
span.recordException(e);
return false;
} finally {
callable.setRenew(false);
}
}
}
@ -589,6 +605,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
@Override
public Result next() throws IOException {
return nextWithSyncCache();
try (Scope ignored = span.makeCurrent()) {
return nextWithSyncCache();
}
}
}

View File

@ -26,6 +26,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMu
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
@ -58,9 +59,11 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -127,8 +130,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
this.pauseNs = builder.pauseNs;
if (builder.pauseForCQTBENs < builder.pauseNs) {
LOG.warn(
"Configured value of pauseForCQTBENs is {} ms, which is less than" +
" the normal pause value {} ms, use the greater one instead",
"Configured value of pauseForCQTBENs is {} ms, which is less than"
+ " the normal pause value {} ms, use the greater one instead",
TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
this.pauseForCQTBENs = builder.pauseNs;
@ -137,8 +140,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() :
conn.connConf.getScannerCaching();
this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
: conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
}
@ -210,15 +213,15 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<MutateRequest, byte[], REQ> reqConvert) {
HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<MutateRequest, byte[], REQ> reqConvert) {
return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
return null;
});
}
private static Result toResult(HBaseRpcController controller, MutateResponse resp)
throws IOException {
throws IOException {
if (!resp.hasResult()) {
return null;
}
@ -231,33 +234,33 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
return mutate(controller, loc, stub, req,
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
newCaller(R row, long rpcTimeoutNs) {
newCaller(R row, long rpcTimeoutNs) {
return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
}
private CompletableFuture<Result> get(Get get, int replicaId) {
return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
.replicaId(replicaId).call();
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
.replicaId(replicaId).call();
}
private TableOperationSpanBuilder newTableOperationSpanBuilder() {
@ -266,8 +269,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> get(Get get) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(get);
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get);
return tracedFuture(
() -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
@ -278,20 +280,18 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(put);
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put);
return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call(), supplier);
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call(),
supplier);
}
@Override
public CompletableFuture<Void> delete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(delete);
return tracedFuture(
() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete);
return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call(),
@ -301,32 +301,30 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(append);
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result, Append> newCaller(append, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce,
controller, loc, stub, append, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce,
controller, loc, stub, append, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
}, supplier);
}
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(increment);
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup,
nonce, controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
}, supplier);
}
@ -351,8 +349,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
" an empty byte array, or just do not call this method if you want a null qualifier");
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
+ " an empty byte array, or just do not call this method if you want a null qualifier");
return this;
}
@ -377,8 +375,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private void preCheck() {
Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
" calling ifNotExists/ifEquals/ifMatches before executing the request");
Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
+ " calling ifNotExists/ifEquals/ifMatches before executing the request");
}
@Override
@ -386,15 +384,15 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
supplier);
}
@ -402,10 +400,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
@ -419,10 +417,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
preCheck();
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutations,
@ -440,7 +437,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private final class CheckAndMutateWithFilterBuilderImpl
implements CheckAndMutateWithFilterBuilder {
implements CheckAndMutateWithFilterBuilder {
private final byte[] row;
@ -463,26 +460,25 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
supplier);
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
@ -495,10 +491,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutations,
@ -517,14 +512,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate.getAction());
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate.getAction());
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
checkAndMutate.getAction() instanceof Increment ||
checkAndMutate.getAction() instanceof Append) {
if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
|| checkAndMutate.getAction() instanceof Increment
|| checkAndMutate.getAction() instanceof Append) {
Mutation mutation = (Mutation) checkAndMutate.getAction();
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
@ -532,37 +525,38 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this
.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(),
rpcTimeoutNs)
.action(
(controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(),
rpcTimeoutNs)
.action(
(controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this
.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this
.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(controller, loc, stub,
rowMutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
resp -> resp))
.call();
.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this
.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(controller, loc, stub,
rowMutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup,
nonce),
resp -> resp))
.call();
} else {
CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
future.completeExceptionally(new DoNotRetryIOException(
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
return future;
}
}, supplier);
@ -570,12 +564,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutates)
.setContainerOperations(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates)
.setContainerOperations(checkAndMutates);
return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
supplier);
}
@ -584,8 +576,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
// so here I write a new method as I do not want to change the abstraction of call method.
@SuppressWarnings("unchecked")
private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) {
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
try {
byte[] regionName = loc.getRegion().getRegionName();
@ -599,14 +591,16 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} else {
try {
org.apache.hadoop.hbase.client.MultiResponse multiResp =
ResponseConverter.getResults(req, resp, controller.cellScanner());
ResponseConverter.getResults(req, resp, controller.cellScanner());
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
loc.getServerName(), multiResp);
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex :
new IOException(
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
future
.completeExceptionally(ex instanceof IOException ? ex
: new IOException(
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
ex));
} else {
future.complete(
respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0)));
@ -628,11 +622,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(mutations)
.setContainerOperations(mutations);
return tracedFuture(
() -> this
final Supplier<Span> supplier =
newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations);
return tracedFuture(() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub,
mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
@ -656,8 +648,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
.start();
pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
.start();
}
private long resultSize2CacheSize(long maxResultSize) {
@ -671,74 +663,66 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
final AsyncTableResultScanner scanner =
new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
scan(scan, scanner);
return scanner;
}
@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(scan);
return tracedFuture(() -> {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}
@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}, supplier);
@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(gets)
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets)
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(puts)
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts)
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
return tracedFutures(() -> voidMutate(puts), supplier);
}
@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(deletes)
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes)
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
return tracedFutures(() -> voidMutate(deletes), supplier);
}
@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(actions)
.setContainerOperations(actions);
final Supplier<Span> supplier =
newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
.map(f -> f.<Void> thenApply(r -> null)).collect(toList());
.map(f -> f.<Void> thenApply(r -> null)).collect(toList());
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
@ -758,10 +742,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
}
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).call();
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
@ -790,9 +774,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs);
region, row, rpcTimeoutNs, operationTimeoutNs);
final Span span = Span.current();
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
@ -816,7 +800,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, byte[] row) {
ServiceCaller<S, R> callable, byte[] row) {
return coprocessorService(stubMaker, callable, null, row);
}
@ -838,9 +822,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
final Span span = Span.current();
if (error != null) {
callback.onError(error);
@ -853,10 +837,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (locateFinished(region, endKey, endKeyInclusive)) {
locateFinished.set(true);
} else {
addListener(
conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
operationTimeoutNs),
(l, e) -> {
addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(),
RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> {
try (Scope ignored = span.makeCurrent()) {
onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e);
@ -878,7 +860,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private final class CoprocessorServiceBuilderImpl<S, R>
implements CoprocessorServiceBuilder<S, R> {
implements CoprocessorServiceBuilder<S, R> {
private final Function<RpcChannel, S> stubMaker;
@ -895,7 +877,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private boolean endKeyInclusive;
public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
this.callable = Preconditions.checkNotNull(callable, "callable is null");
this.callback = Preconditions.checkNotNull(callback, "callback is null");
@ -904,9 +886,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
this.startKey = Preconditions.checkNotNull(startKey,
"startKey is null. Consider using" +
" an empty byte array, or just do not call this method if you want to start selection" +
" from the first region");
"startKey is null. Consider using"
+ " an empty byte array, or just do not call this method if you want to start selection"
+ " from the first region");
this.startKeyInclusive = inclusive;
return this;
}
@ -914,9 +896,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
this.endKey = Preconditions.checkNotNull(endKey,
"endKey is null. Consider using" +
" an empty byte array, or just do not call this method if you want to continue" +
" selection to the last region");
"endKey is null. Consider using"
+ " an empty byte array, or just do not call this method if you want to continue"
+ " selection to the last region");
this.endKeyInclusive = inclusive;
return this;
}
@ -924,14 +906,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public void execute() {
final Span span = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC)
.build();
.setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build();
try (Scope ignored = span.makeCurrent()) {
final RegionLocateType regionLocateType = startKeyInclusive
? RegionLocateType.CURRENT
: RegionLocateType.AFTER;
final RegionLocateType regionLocateType =
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER;
final CompletableFuture<HRegionLocation> future = conn.getLocator()
.getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
.getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
addListener(future, (loc, error) -> {
try (Scope ignored1 = span.makeCurrent()) {
onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
@ -944,8 +924,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) {
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) {
return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
}
}

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -24,9 +23,8 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Interface for client-side scanning. Go to {@link Table} to obtain instances.
@ -50,7 +48,8 @@ public interface ResultScanner extends Closeable, Iterable<Result> {
return true;
}
try {
return (next = ResultScanner.this.next()) != null;
next = ResultScanner.this.next();
return next != null;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@ -89,7 +88,6 @@ public interface ResultScanner extends Closeable, Iterable<Result> {
* @param nbRows number of rows to return
* @return Between zero and nbRows rowResults. Scan is done if returned array is of zero-length
* (We never return null).
* @throws IOException
*/
default Result[] next(int nbRows) throws IOException {
List<Result> resultSets = new ArrayList<>(nbRows);

View File

@ -28,6 +28,8 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildT
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.fail;
@ -36,6 +38,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@ -44,8 +47,10 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -75,8 +80,10 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@ -97,7 +104,7 @@ public class TestAsyncTableTracing {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
private static Configuration CONF = HBaseConfiguration.create();
@ -105,7 +112,7 @@ public class TestAsyncTableTracing {
private AsyncConnectionImpl conn;
private AsyncTable<?> table;
private AsyncTable<ScanResultConsumer> table;
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
@ -122,18 +129,18 @@ public class TestAsyncTableTracing {
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true).build());
.setMoreResultsInRegion(true).setMoreResults(true).build());
} else {
if (req.hasCloseScanner() && req.getCloseScanner()) {
done.run(ScanResponse.getDefaultInstance());
} else {
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
.setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.addResults(ProtobufUtil.toResult(result));
.addResults(ProtobufUtil.toResult(result));
if (req.getLimitOfRows() == 1) {
builder.setMoreResultsInRegion(false).setMoreResults(false);
} else {
@ -175,13 +182,13 @@ public class TestAsyncTableTracing {
case INCREMENT:
ColumnValue value = req.getColumnValue(0);
QualifierValue qvalue = value.getQualifierValue(0);
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setType(Cell.Type.Put).setRow(req.getRow().toByteArray())
.setFamily(value.getFamily().toByteArray())
.setQualifier(qvalue.getQualifier().toByteArray())
.setValue(qvalue.getValue().toByteArray()).build();
Cell cell =
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
.setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
.setQualifier(qvalue.getQualifier().toByteArray())
.setValue(qvalue.getValue().toByteArray()).build();
resp = MutateResponse.newBuilder()
.setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
.setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
break;
default:
resp = MutateResponse.getDefaultInstance();
@ -202,25 +209,24 @@ public class TestAsyncTableTracing {
}
}).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
final User user = UserProvider.instantiate(CONF).getCurrent();
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test",
user) {
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", user) {
@Override
AsyncRegionLocator getLocator() {
AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
Answer<CompletableFuture<HRegionLocation>> answer =
new Answer<CompletableFuture<HRegionLocation>>() {
new Answer<CompletableFuture<HRegionLocation>>() {
@Override
public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
throws Throwable {
TableName tableName = invocation.getArgument(0);
RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
HRegionLocation loc = new HRegionLocation(info, serverName);
return CompletableFuture.completedFuture(loc);
}
};
@Override
public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
throws Throwable {
TableName tableName = invocation.getArgument(0);
RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
HRegionLocation loc = new HRegionLocation(info, serverName);
return CompletableFuture.completedFuture(loc);
}
};
doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
any(RegionLocateType.class), anyLong());
doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
@ -249,26 +255,19 @@ public class TestAsyncTableTracing {
// n.b. this method implementation must match the one of the same name found in
// TestHTableTracing
final TableName tableName = table.getName();
final Matcher<SpanData> spanLocator = allOf(
hasName(containsString(tableOperation)), hasEnded());
final Matcher<SpanData> spanLocator =
allOf(hasName(containsString(tableOperation)), hasEnded());
final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
"waiting for span to emit",
() -> traceRule.getSpans(), hasItem(spanLocator)));
List<SpanData> candidateSpans = traceRule.getSpans()
.stream()
.filter(spanLocator::matches)
.collect(Collectors.toList());
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit",
() -> traceRule.getSpans(), hasItem(spanLocator)));
List<SpanData> candidateSpans =
traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList());
assertThat(candidateSpans, hasSize(1));
SpanData data = candidateSpans.iterator().next();
assertThat(data, allOf(
hasName(expectedName),
hasKind(SpanKind.CLIENT),
hasStatusWithCode(StatusCode.OK),
buildConnectionAttributesMatcher(conn),
buildTableAttributesMatcher(tableName),
matcher));
assertThat(data,
allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK),
buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher));
}
@Test
@ -306,16 +305,16 @@ public class TestAsyncTableTracing {
@Test
public void testIncrement() {
table
.increment(
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
.join();
.increment(
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
.join();
assertTrace("INCREMENT");
}
@Test
public void testIncrementColumnValue1() {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
.join();
.join();
assertTrace("INCREMENT");
}
@ -329,38 +328,37 @@ public class TestAsyncTableTracing {
@Test
public void testCheckAndMutate() {
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0)))).join();
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0)))).join();
assertTrace("CHECK_AND_MUTATE");
}
@Test
public void testCheckAndMutateList() {
CompletableFuture
.allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
.allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
"CHECK_AND_MUTATE", "DELETE")));
}
@Test
public void testCheckAndMutateAll() {
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
"CHECK_AND_MUTATE", "DELETE")));
}
private void testCheckAndMutateBuilder(Row op) {
AsyncTable.CheckAndMutateBuilder builder =
table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
.qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v"));
table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v"));
if (op instanceof Put) {
Put put = (Put) op;
builder.thenPut(put).join();
@ -378,8 +376,8 @@ public class TestAsyncTableTracing {
@Test
public void testCheckAndMutateBuilderThenPut() {
Put put = new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
Bytes.toBytes("v"));
testCheckAndMutateBuilder(put);
}
@ -390,17 +388,18 @@ public class TestAsyncTableTracing {
@Test
public void testCheckAndMutateBuilderThenMutations() throws IOException {
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add((Mutation) (new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"))))
.add((Mutation) new Delete(Bytes.toBytes(0)));
RowMutations mutations =
new RowMutations(Bytes.toBytes(0))
.add((Mutation) (new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"),
Bytes.toBytes("cq"), Bytes.toBytes("v"))))
.add((Mutation) new Delete(Bytes.toBytes(0)));
testCheckAndMutateBuilder(mutations);
}
private void testCheckAndMutateWithFilterBuilder(Row op) {
// use of `PrefixFilter` is completely arbitrary here.
AsyncTable.CheckAndMutateWithFilterBuilder builder =
table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
if (op instanceof Put) {
Put put = (Put) op;
builder.thenPut(put).join();
@ -418,8 +417,8 @@ public class TestAsyncTableTracing {
@Test
public void testCheckAndMutateWithFilterBuilderThenPut() {
Put put = new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
Bytes.toBytes("v"));
testCheckAndMutateWithFilterBuilder(put);
}
@ -430,19 +429,21 @@ public class TestAsyncTableTracing {
@Test
public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add((Mutation) new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add((Mutation) new Delete(Bytes.toBytes(0)));
RowMutations mutations =
new RowMutations(Bytes.toBytes(0))
.add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add((Mutation) new Delete(Bytes.toBytes(0)));
testCheckAndMutateWithFilterBuilder(mutations);
}
@Test
public void testMutateRow() throws IOException {
final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add((Mutation) new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add((Mutation) new Delete(Bytes.toBytes(0)));
final RowMutations mutations =
new RowMutations(Bytes.toBytes(0))
.add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add((Mutation) new Delete(Bytes.toBytes(0)));
table.mutateRow(mutations).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
@ -454,88 +455,137 @@ public class TestAsyncTableTracing {
assertTrace("SCAN");
}
@Test
public void testScan() throws Throwable {
final CountDownLatch doneSignal = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Throwable> throwable = new AtomicReference<>();
final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
table.scan(scan, new ScanResultConsumer() {
@Override
public boolean onNext(Result result) {
if (result.getRow() != null) {
count.incrementAndGet();
}
return true;
}
@Override
public void onError(Throwable error) {
throwable.set(error);
doneSignal.countDown();
}
@Override
public void onComplete() {
doneSignal.countDown();
}
});
doneSignal.await();
if (throwable.get() != null) {
throw throwable.get();
}
assertThat("user code did not run. check test setup.", count.get(), greaterThan(0));
assertTrace("SCAN");
}
@Test
public void testGetScanner() {
final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
for (Result result : scanner) {
if (result.getRow() != null) {
count++;
}
}
// do something with it.
assertThat(count, greaterThanOrEqualTo(0));
}
assertTrace("SCAN");
}
@Test
public void testExistsList() {
CompletableFuture
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}
@Test
public void testExistsAll() {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}
@Test
public void testGetList() {
CompletableFuture
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
.allOf(
table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}
@Test
public void testGetAll() {
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}
@Test
public void testPutList() {
CompletableFuture
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}
@Test
public void testPutAll() {
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}
@Test
public void testDeleteList() {
CompletableFuture
.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
CompletableFuture.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}
@Test
public void testDeleteAll() {
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}
@Test
public void testBatch() {
CompletableFuture
.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
CompletableFuture.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}
@Test
public void testBatchAll() {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
assertTrace("BATCH",
hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}
}

View File

@ -17,15 +17,19 @@
*/
package org.apache.hadoop.hbase.client.trace.hamcrest;
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.time.Duration;
import java.util.Objects;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
@ -36,21 +40,22 @@ import org.hamcrest.TypeSafeMatcher;
*/
public final class SpanDataMatchers {
private SpanDataMatchers() { }
private SpanDataMatchers() {
}
public static Matcher<SpanData> hasAttributes(Matcher<Attributes> matcher) {
return new FeatureMatcher<SpanData, Attributes>(
matcher, "SpanData having attributes that ", "attributes"
) {
@Override protected Attributes featureValueOf(SpanData item) {
return new FeatureMatcher<SpanData, Attributes>(matcher, "SpanData having attributes that ",
"attributes") {
@Override
protected Attributes featureValueOf(SpanData item) {
return item.getAttributes();
}
};
}
public static Matcher<SpanData> hasDuration(Matcher<Duration> matcher) {
return new FeatureMatcher<SpanData, Duration>(
matcher, "SpanData having duration that ", "duration") {
return new FeatureMatcher<SpanData, Duration>(matcher, "SpanData having duration that ",
"duration") {
@Override
protected Duration featureValueOf(SpanData item) {
return Duration.ofNanos(item.getEndEpochNanos() - item.getStartEpochNanos());
@ -60,28 +65,49 @@ public final class SpanDataMatchers {
public static Matcher<SpanData> hasEnded() {
return new TypeSafeMatcher<SpanData>() {
@Override protected boolean matchesSafely(SpanData item) {
@Override
protected boolean matchesSafely(SpanData item) {
return item.hasEnded();
}
@Override public void describeTo(Description description) {
@Override
public void describeTo(Description description) {
description.appendText("SpanData that hasEnded");
}
};
}
public static Matcher<SpanData> hasEvents(Matcher<Iterable<? super EventData>> matcher) {
return new FeatureMatcher<SpanData, Iterable<? super EventData>>(
matcher, "SpanData having events that", "events") {
@Override protected Iterable<? super EventData> featureValueOf(SpanData item) {
return new FeatureMatcher<SpanData, Iterable<? super EventData>>(matcher,
"SpanData having events that", "events") {
@Override
protected Iterable<? super EventData> featureValueOf(SpanData item) {
return item.getEvents();
}
};
}
public static Matcher<SpanData> hasExceptionWithType(Matcher<? super String> matcher) {
return hasException(containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), matcher));
}
public static Matcher<SpanData> hasException(Matcher<? super Attributes> matcher) {
return new FeatureMatcher<SpanData, Attributes>(matcher,
"SpanData having Exception with Attributes that", "exception attributes") {
@Override
protected Attributes featureValueOf(SpanData actual) {
return actual.getEvents().stream()
.filter(e -> Objects.equals(SemanticAttributes.EXCEPTION_EVENT_NAME, e.getName()))
.map(EventData::getAttributes).findFirst().orElse(null);
}
};
}
public static Matcher<SpanData> hasKind(SpanKind kind) {
return new FeatureMatcher<SpanData, SpanKind>(
equalTo(kind), "SpanData with kind that", "SpanKind") {
@Override protected SpanKind featureValueOf(SpanData item) {
return new FeatureMatcher<SpanData, SpanKind>(equalTo(kind), "SpanData with kind that",
"SpanKind") {
@Override
protected SpanKind featureValueOf(SpanData item) {
return item.getKind();
}
};
@ -93,7 +119,8 @@ public final class SpanDataMatchers {
public static Matcher<SpanData> hasName(Matcher<String> matcher) {
return new FeatureMatcher<SpanData, String>(matcher, "SpanKind with a name that", "name") {
@Override protected String featureValueOf(SpanData item) {
@Override
protected String featureValueOf(SpanData item) {
return item.getName();
}
};
@ -109,9 +136,9 @@ public final class SpanDataMatchers {
public static Matcher<SpanData> hasParentSpanId(Matcher<String> matcher) {
return new FeatureMatcher<SpanData, String>(matcher, "SpanKind with a parentSpanId that",
"parentSpanId"
) {
@Override protected String featureValueOf(SpanData item) {
"parentSpanId") {
@Override
protected String featureValueOf(SpanData item) {
return item.getParentSpanId();
}
};
@ -120,13 +147,15 @@ public final class SpanDataMatchers {
public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
final Matcher<StatusCode> matcher = is(equalTo(statusCode));
return new TypeSafeMatcher<SpanData>() {
@Override protected boolean matchesSafely(SpanData item) {
@Override
protected boolean matchesSafely(SpanData item) {
final StatusData statusData = item.getStatus();
return statusData != null
&& statusData.getStatusCode() != null
&& matcher.matches(statusData.getStatusCode());
return statusData != null && statusData.getStatusCode() != null
&& matcher.matches(statusData.getStatusCode());
}
@Override public void describeTo(Description description) {
@Override
public void describeTo(Description description) {
description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
}
};
@ -137,9 +166,10 @@ public final class SpanDataMatchers {
}
public static Matcher<SpanData> hasTraceId(Matcher<String> matcher) {
return new FeatureMatcher<SpanData, String>(
matcher, "SpanData with a traceId that ", "traceId") {
@Override protected String featureValueOf(SpanData item) {
return new FeatureMatcher<SpanData, String>(matcher, "SpanData with a traceId that ",
"traceId") {
@Override
protected String featureValueOf(SpanData item) {
return item.getTraceId();
}
};

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,7 +19,6 @@ package org.apache.hadoop.hbase;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.yetus.audience.InterfaceAudience;
@ -28,10 +26,9 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Options for starting up a mini cluster (including an hbase, dfs and zookeeper clusters) in test.
* The options include HDFS options to build mini dfs cluster, Zookeeper options to build mini zk
* cluster, and mostly HBase options to build mini hbase cluster.
* cluster, and mostly HBase options to build mini hbase cluster. To create an object, use a
* {@link Builder}. Example usage:
*
* To create an object, use a {@link Builder}.
* Example usage:
* <pre>
* StartMiniClusterOption option = StartMiniClusterOption.builder().
* .numMasters(3).rsClass(MyRegionServer.class).createWALDir(true).build();
@ -42,8 +39,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Public
public final class StartMiniClusterOption {
/**
* Number of masters to start up. We'll start this many hbase masters. If numMasters > 1, you
* can find the active/primary master with {@link MiniHBaseCluster#getMaster()}.
* Number of masters to start up. We'll start this many hbase masters. If numMasters > 1, you can
* find the active/primary master with {@link MiniHBaseCluster#getMaster()}.
*/
private final int numMasters;
@ -60,9 +57,8 @@ public final class StartMiniClusterOption {
private final Class<? extends HMaster> masterClass;
/**
* Number of region servers to start up.
* If this value is > 1, then make sure config "hbase.regionserver.info.port" is -1
* (i.e. no ui per regionserver) otherwise bind errors.
* Number of region servers to start up. If this value is > 1, then make sure config
* "hbase.regionserver.info.port" is -1 (i.e. no ui per regionserver) otherwise bind errors.
*/
private final int numRegionServers;
/**
@ -93,13 +89,13 @@ public final class StartMiniClusterOption {
private final int numZkServers;
/**
* Whether to create a new root or data directory path. If true, the newly created data directory
* will be configured as HBase rootdir. This will overwrite existing root directory config.
* Whether to create a new root or data directory path. If true, the newly created data directory
* will be configured as HBase rootdir. This will overwrite existing root directory config.
*/
private final boolean createRootDir;
/**
* Whether to create a new WAL directory. If true, the newly created directory will be configured
* Whether to create a new WAL directory. If true, the newly created directory will be configured
* as HBase wal.dir which is separate from HBase rootdir.
*/
private final boolean createWALDir;
@ -172,9 +168,9 @@ public final class StartMiniClusterOption {
public String toString() {
return "StartMiniClusterOption{" + "numMasters=" + numMasters + ", masterClass=" + masterClass
+ ", numRegionServers=" + numRegionServers + ", rsPorts=" + StringUtils.join(rsPorts)
+ ", rsClass=" + rsClass + ", numDataNodes=" + numDataNodes
+ ", dataNodeHosts=" + Arrays.toString(dataNodeHosts) + ", numZkServers=" + numZkServers
+ ", createRootDir=" + createRootDir + ", createWALDir=" + createWALDir + '}';
+ ", rsClass=" + rsClass + ", numDataNodes=" + numDataNodes + ", dataNodeHosts="
+ Arrays.toString(dataNodeHosts) + ", numZkServers=" + numZkServers + ", createRootDir="
+ createRootDir + ", createWALDir=" + createWALDir + '}';
}
/**
@ -185,10 +181,9 @@ public final class StartMiniClusterOption {
}
/**
* Builder pattern for creating an {@link StartMiniClusterOption}.
*
* The default values of its fields should be considered public and constant. Changing the default
* values may cause other tests fail.
* Builder pattern for creating an {@link StartMiniClusterOption}. The default values of its
* fields should be considered public and constant. Changing the default values may cause other
* tests fail.
*/
public static final class Builder {
private int numMasters = 1;
@ -210,7 +205,7 @@ public final class StartMiniClusterOption {
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
numDataNodes = dataNodeHosts.length;
}
return new StartMiniClusterOption(numMasters,numAlwaysStandByMasters, masterClass,
return new StartMiniClusterOption(numMasters, numAlwaysStandByMasters, masterClass,
numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
createRootDir, createWALDir);
}
@ -260,6 +255,10 @@ public final class StartMiniClusterOption {
return this;
}
public Builder numWorkers(int numWorkers) {
return numDataNodes(numWorkers).numRegionServers(numWorkers);
}
public Builder createRootDir(boolean createRootDir) {
this.createRootDir = createRootDir;
return this;

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,30 +17,91 @@
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConnectionRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.MiniClusterRule;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.rules.RuleChain;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
public abstract class AbstractTestAsyncTableScan {
protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
.setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build();
protected static final ConnectionRule connectionRule =
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private static final class Setup extends ExternalResource {
@Override
protected void before() throws Throwable {
final HBaseTestingUtility testingUtil = miniClusterRule.getTestingUtility();
final AsyncConnection conn = connectionRule.getAsyncConnection();
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
testingUtil.waitTableAvailable(TABLE_NAME);
conn.getTable(TABLE_NAME)
.putAll(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i))
.addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
.collect(Collectors.toList()))
.get();
}
}
@ClassRule
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
.around(miniClusterRule).around(connectionRule).around(new Setup());
@Rule
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
@Rule
public final TestName testName = new TestName();
protected static TableName TABLE_NAME = TableName.valueOf("async");
@ -52,53 +113,29 @@ public abstract class AbstractTestAsyncTableScan {
protected static int COUNT = 1000;
protected static AsyncConnection ASYNC_CONN;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3);
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
ASYNC_CONN.getTable(TABLE_NAME).putAll(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
.collect(Collectors.toList())).get();
}
@AfterClass
public static void tearDown() throws Exception {
ASYNC_CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
protected static Scan createNormalScan() {
private static Scan createNormalScan() {
return new Scan();
}
protected static Scan createBatchScan() {
private static Scan createBatchScan() {
return new Scan().setBatch(1);
}
// set a small result size for testing flow control
protected static Scan createSmallResultSizeScan() {
private static Scan createSmallResultSizeScan() {
return new Scan().setMaxResultSize(1);
}
protected static Scan createBatchSmallResultSizeScan() {
private static Scan createBatchSmallResultSizeScan() {
return new Scan().setBatch(1).setMaxResultSize(1);
}
protected static AsyncTable<?> getRawTable() {
return ASYNC_CONN.getTable(TABLE_NAME);
private static AsyncTable<?> getRawTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
}
protected static AsyncTable<?> getTable() {
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
private static AsyncTable<?> getTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
}
private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
@ -132,8 +169,18 @@ public abstract class AbstractTestAsyncTableScan {
protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;
/**
* Used by implementation classes to assert the correctness of spans produced under test.
*/
protected abstract void assertTraceContinuity();
/**
* Used by implementation classes to assert the correctness of spans having errors.
*/
protected abstract void assertTraceError(final Matcher<String> exceptionTypeNameMatcher);
protected final List<Result> convertFromBatchResult(List<Result> results) {
assertTrue(results.size() % 2 == 0);
assertEquals(0, results.size() % 2);
return IntStream.range(0, results.size() / 2).mapToObj(i -> {
try {
return Result
@ -144,16 +191,22 @@ public abstract class AbstractTestAsyncTableScan {
}).collect(Collectors.toList());
}
protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
}
@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.forEach(
rs -> assertEquals(
"The scanner count of " + rs.getServerName() + " is " +
rs.getRSRpcServices().getScannersCount(),
0, rs.getRSRpcServices().getScannersCount()));
miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer)
.forEach(rs -> assertEquals(
"The scanner count of " + rs.getServerName() + " is "
+ rs.getRSRpcServices().getScannersCount(),
0, rs.getRSRpcServices().getScannersCount()));
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> {
Result result = results.get(i);
@ -170,49 +223,66 @@ public abstract class AbstractTestAsyncTableScan {
@Test
public void testReversedScanAll() throws Exception {
List<Result> results = doScan(createScan().setReversed(true), -1);
List<Result> results =
TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName());
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
assertTraceContinuity();
}
@Test
public void testScanNoStopKey() throws Exception {
int start = 345;
List<Result> results =
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1);
List<Result> results = TraceUtil.trace(
() -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1),
testName.getMethodName());
assertEquals(COUNT - start, results.size());
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
assertTraceContinuity();
}
@Test
public void testReverseScanNoStopKey() throws Exception {
int start = 765;
List<Result> results = doScan(
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1);
final Scan scan =
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true);
List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName());
assertEquals(start + 1, results.size());
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
assertTraceContinuity();
}
@Test
public void testScanWrongColumnFamily() throws Exception {
try {
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1);
} catch (Exception e) {
assertTrue(e instanceof NoSuchColumnFamilyException ||
e.getCause() instanceof NoSuchColumnFamilyException);
public void testScanWrongColumnFamily() {
final Exception e = assertThrows(Exception.class,
() -> TraceUtil.trace(
() -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1),
testName.getMethodName()));
// hamcrest generic enforcement for `anyOf` is a pain; skip it
// but -- don't we always unwrap ExecutionExceptions -- bug?
if (e instanceof NoSuchColumnFamilyException) {
final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e;
assertThat(ex, isA(NoSuchColumnFamilyException.class));
} else if (e instanceof ExecutionException) {
final ExecutionException ex = (ExecutionException) e;
assertThat(ex, allOf(isA(ExecutionException.class),
hasProperty("cause", isA(NoSuchColumnFamilyException.class))));
} else {
fail("Found unexpected Exception " + e);
}
assertTraceError(endsWith(NoSuchColumnFamilyException.class.getName()));
}
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit) throws Exception {
int limit) throws Exception {
testScan(start, startInclusive, stop, stopInclusive, limit, -1);
}
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit, int closeAfter) throws Exception {
Scan scan =
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
if (limit > 0) {
scan.setLimit(limit);
}
@ -232,9 +302,9 @@ public abstract class AbstractTestAsyncTableScan {
private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit) throws Exception {
Scan scan =
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
Scan scan = createScan()
.withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
if (limit > 0) {
scan.setLimit(limit);
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
/**
* Advise the scanning infrastructure to collect up to {@code limit} results.
*/
class LimitedScanResultConsumer extends SimpleScanResultConsumerImpl {
private final int limit;
public LimitedScanResultConsumer(int limit) {
this.limit = limit;
}
@Override
public synchronized boolean onNext(Result result) {
return super.onNext(result) && results.size() < limit;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,59 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
final class SimpleScanResultConsumer implements ScanResultConsumer {
/**
* A simplistic {@link ScanResultConsumer} for use in tests.
*/
public interface SimpleScanResultConsumer extends ScanResultConsumer {
private ScanMetrics scanMetrics;
List<Result> getAll() throws Exception;
private final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return true;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
ScanMetrics getScanMetrics();
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
class SimpleScanResultConsumerImpl implements SimpleScanResultConsumer {
private ScanMetrics scanMetrics;
protected final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return true;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
@Override
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,24 +17,41 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScan.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -59,7 +76,7 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<ScanResultConsumer> table =
ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results;
if (closeAfter > 0) {
// these tests batch settings with the sample data result in each result being
@ -68,11 +85,13 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
if (scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter);
TracedScanResultConsumer consumer =
new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter));
table.scan(scan, consumer);
results = consumer.getAll();
} else {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
TracedScanResultConsumer consumer =
new TracedScanResultConsumer(new SimpleScanResultConsumerImpl());
table.scan(scan, consumer);
results = consumer.getAll();
}
@ -82,49 +101,77 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
return results;
}
private static class LimitedScanResultConsumer implements ScanResultConsumer {
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher =
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);
private final int limit;
public LimitedScanResultConsumer(int limit) {
this.limit = limit;
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
private final List<Result> results = new ArrayList<>();
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
private Throwable error;
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
private boolean finished = false;
final Matcher<SpanData> onScanMetricsCreatedMatcher =
hasName("TracedScanResultConsumer#onScanMetricsCreated");
assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span,
allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded())));
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return results.size() < limit;
}
final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
assertThat(spans, hasItem(onNextMatcher));
spans.stream().filter(onNextMatcher::matches)
.forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK), hasEnded())));
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream().filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
assertThat(spans, hasItem(onErrorMatcher));
spans.stream().filter(onErrorMatcher::matches)
.forEach(span -> assertThat(span, allOf(onErrorMatcher,
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
}
}

View File

@ -17,21 +17,40 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanAll.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -72,4 +91,50 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
}
return results;
}
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher =
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -123,7 +123,7 @@ public class TestAsyncTableScanMetrics {
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
throws Exception {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl();
CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,23 +17,42 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanner.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -63,7 +82,8 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
AsyncTable<?> table =
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
@ -84,4 +104,49 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
}
return results;
}
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher =
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
assertThat(spans,
hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,22 +17,42 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -56,8 +76,8 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer();
ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer);
TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer();
connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer);
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
@ -76,4 +96,79 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
}
return results;
}
@Override
protected void assertTraceContinuity() {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher =
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
// RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug?
final Matcher<SpanData> onScanMetricsCreatedMatcher =
hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated");
assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher)));
final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext");
assertThat(spans, hasItem(onNextMatcher));
spans.stream().filter(onNextMatcher::matches)
.forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId)));
assertThat(spans, hasItem(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK), hasEnded())));
final Matcher<SpanData> onCompleteMatcher =
hasName("TracedAdvancedScanResultConsumer#onComplete");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream().filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
}
@Override
protected void assertTraceError(Matcher<String> exceptionTypeNameMatcher) {
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(logger::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasExceptionWithType(exceptionTypeNameMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream().filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
}
}

View File

@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConnectionRule;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.MiniClusterRule;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExternalResource;
import org.junit.rules.RuleChain;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ LargeTests.class, ClientTests.class })
public class TestResultScannerTracing {
private static final Logger LOG = LoggerFactory.getLogger(TestResultScannerTracing.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestResultScannerTracing.class);
private static final TableName TABLE_NAME =
TableName.valueOf(TestResultScannerTracing.class.getSimpleName());
private static final byte[] FAMILY = Bytes.toBytes("f");
private static final byte[] CQ = Bytes.toBytes("q");
private static final int COUNT = 1000;
private static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
.setMiniClusterOption(StartMiniClusterOption.builder().numRegionServers(3).build()).build();
private static final ConnectionRule connectionRule =
ConnectionRule.createConnectionRule(miniClusterRule::createConnection);
private static final class Setup extends ExternalResource {
private Connection conn;
@Override
protected void before() throws Throwable {
final HBaseTestingUtility testUtil = miniClusterRule.getTestingUtility();
conn = testUtil.getConnection();
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
testUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
testUtil.waitTableAvailable(TABLE_NAME);
try (final Table table = conn.getTable(TABLE_NAME)) {
table.put(
IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ, Bytes.toBytes(i))).collect(Collectors.toList()));
}
}
@Override
protected void after() {
try (Admin admin = conn.getAdmin()) {
if (!admin.tableExists(TABLE_NAME)) {
return;
}
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@ClassRule
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
.around(miniClusterRule).around(connectionRule).around(new Setup());
@Rule
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
@Rule
public final TestName testName = new TestName();
@Before
public void before() throws Exception {
final Connection conn = connectionRule.getConnection();
try (final RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
locator.clearRegionLocationCache();
}
}
private static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
}
private Scan buildDefaultScan() {
return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 1)))
.withStopRow(Bytes.toBytes(String.format("%03d", 998)));
}
private void assertDefaultScan(final Scan scan) {
assertThat(scan.isReversed(), is(false));
assertThat(scan.isAsyncPrefetch(), nullValue());
}
private Scan buildAsyncPrefetchScan() {
return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 1)))
.withStopRow(Bytes.toBytes(String.format("%03d", 998))).setAsyncPrefetch(true);
}
private void assertAsyncPrefetchScan(final Scan scan) {
assertThat(scan.isReversed(), is(false));
assertThat(scan.isAsyncPrefetch(), is(true));
}
private Scan buildReversedScan() {
return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 998)))
.withStopRow(Bytes.toBytes(String.format("%03d", 1))).setReversed(true);
}
private void assertReversedScan(final Scan scan) {
assertThat(scan.isReversed(), is(true));
assertThat(scan.isAsyncPrefetch(), nullValue());
}
private void doScan(final Supplier<Scan> spanSupplier, final Consumer<Scan> scanAssertions)
throws Exception {
final Connection conn = connectionRule.getConnection();
final Scan scan = spanSupplier.get();
scanAssertions.accept(scan);
try (final Table table = conn.getTable(TABLE_NAME);
final ResultScanner scanner = table.getScanner(scan)) {
final List<Result> results = new ArrayList<>(COUNT);
scanner.forEach(results::add);
assertThat(results, not(emptyIterable()));
}
}
@Test
public void testNormalScan() throws Exception {
TraceUtil.trace(() -> doScan(this::buildDefaultScan, this::assertDefaultScan),
testName.getMethodName());
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher =
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (LOG.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(LOG::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")),
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher));
}
@Test
public void testAsyncPrefetchScan() throws Exception {
TraceUtil.trace(() -> doScan(this::buildAsyncPrefetchScan, this::assertAsyncPrefetchScan),
testName.getMethodName());
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher =
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (LOG.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(LOG::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")),
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher));
}
@Test
public void testReversedScan() throws Exception {
TraceUtil.trace(() -> doScan(this::buildReversedScan, this::assertReversedScan),
testName.getMethodName());
final String parentSpanName = testName.getMethodName();
final Matcher<SpanData> parentSpanMatcher =
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);
final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (LOG.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
stringTraceRenderer.render(LOG::debug);
}
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")),
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher));
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.trace.TraceUtil;
/**
* A drop-in replacement for {@link BufferingScanResultConsumer} that adds tracing spans to its
* implementation of the {@link AdvancedScanResultConsumer} API.
*/
public class TracedAdvancedScanResultConsumer implements AdvancedScanResultConsumer {
private final BufferingScanResultConsumer delegate = new BufferingScanResultConsumer();
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
TraceUtil.trace(() -> delegate.onScanMetricsCreated(scanMetrics),
"TracedAdvancedScanResultConsumer#onScanMetricsCreated");
}
@Override
public void onNext(Result[] results, ScanController controller) {
TraceUtil.trace(() -> delegate.onNext(results, controller),
"TracedAdvancedScanResultConsumer#onNext");
}
@Override
public void onError(Throwable error) {
TraceUtil.trace(() -> delegate.onError(error), "TracedAdvancedScanResultConsumer#onError");
}
@Override
public void onComplete() {
TraceUtil.trace(delegate::onComplete, "TracedAdvancedScanResultConsumer#onComplete");
}
public Result take() throws IOException, InterruptedException {
return delegate.take();
}
public ScanMetrics getScanMetrics() {
return delegate.getScanMetrics();
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.trace.TraceUtil;
/**
* A wrapper over {@link SimpleScanResultConsumer} that adds tracing of spans to its implementation.
*/
class TracedScanResultConsumer implements SimpleScanResultConsumer {
private final SimpleScanResultConsumer delegate;
public TracedScanResultConsumer(final SimpleScanResultConsumer delegate) {
this.delegate = delegate;
}
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
TraceUtil.trace(() -> delegate.onScanMetricsCreated(scanMetrics),
"TracedScanResultConsumer#onScanMetricsCreated");
}
@Override
public boolean onNext(Result result) {
return TraceUtil.trace(() -> delegate.onNext(result), "TracedScanResultConsumer#onNext");
}
@Override
public void onError(Throwable error) {
TraceUtil.trace(() -> delegate.onError(error), "TracedScanResultConsumer#onError");
}
@Override
public void onComplete() {
TraceUtil.trace(delegate::onComplete, "TracedScanResultConsumer#onComplete");
}
@Override
public List<Result> getAll() throws Exception {
return delegate.getAll();
}
@Override
public ScanMetrics getScanMetrics() {
return delegate.getScanMetrics();
}
}

View File

@ -692,7 +692,8 @@
<hbase-surefire.cygwin-argLine>-enableassertions -Xmx${surefire.cygwinXmx}
-Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
"-Djava.library.path=${hadoop.library.path};${java.library.path}"
-Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced</hbase-surefire.cygwin-argLine>
-Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced
-Dio.opentelemetry.context.enableStrictContext=true</hbase-surefire.cygwin-argLine>
<!-- Surefire argLine defaults to Linux, cygwin argLine is used in the os.windows profile -->
<argLine>${hbase-surefire.argLine}</argLine>
<jacoco.version>0.7.5.201505241946</jacoco.version>