From 5b4bb8217dd4327a89fa29c93ac37bc887d96c2c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 20 Mar 2017 17:12:53 +0800 Subject: [PATCH] HBASE-17691 Add ScanMetrics support for async scan --- .../hbase/client/AsyncClientScanner.java | 34 +++- .../client/AsyncRpcRetryingCallerFactory.java | 24 ++- ...syncScanSingleRegionRpcRetryingCaller.java | 35 +++- .../hadoop/hbase/client/AsyncTableBase.java | 9 +- .../hadoop/hbase/client/AsyncTableImpl.java | 1 + .../hbase/client/AsyncTableResultScanner.java | 9 +- .../hadoop/hbase/client/ClientScanner.java | 8 +- .../hadoop/hbase/client/ConnectionUtils.java | 75 +++++++++ .../hbase/client/RawScanResultConsumer.java | 10 ++ .../hbase/client/ReversedScannerCallable.java | 10 +- .../hbase/client/ScanResultConsumer.java | 9 + .../hadoop/hbase/client/ScannerCallable.java | 88 ++-------- .../client/SimpleRawScanResultConsumer.java | 84 +++++++++ .../client/SimpleScanResultConsumer.java | 75 +++++++++ .../hbase/client/TestAsyncTableScan.java | 42 ----- .../client/TestAsyncTableScanMetrics.java | 159 ++++++++++++++++++ .../hbase/client/TestRawAsyncTableScan.java | 52 ------ 17 files changed, 526 insertions(+), 198 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index fa7aa819e07..2c1693d01c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -19,8 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; -import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -29,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -51,6 +51,8 @@ class AsyncClientScanner { // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly. private final Scan scan; + private final ScanMetrics scanMetrics; + private final RawScanResultConsumer consumer; private final TableName tableName; @@ -88,29 +90,46 @@ class AsyncClientScanner { this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; this.resultCache = createScanResultCache(scan); + if (scan.isScanMetricsEnabled()) { + this.scanMetrics = new ScanMetrics(); + consumer.onScanMetricsCreated(scanMetrics); + } else { + this.scanMetrics = null; + } } private static final class OpenScannerResponse { public final HRegionLocation loc; + public final boolean isRegionServerRemote; + public final ClientService.Interface stub; public final HBaseRpcController controller; public final ScanResponse resp; - public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller, - ScanResponse resp) { + public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub, + HBaseRpcController controller, ScanResponse resp) { this.loc = loc; + this.isRegionServerRemote = isRegionServerRemote; this.stub = stub; this.controller = controller; this.resp = resp; } } + private int openScannerTries; + private CompletableFuture callOpenScanner(HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub) { + boolean isRegionServerRemote = isRemote(loc.getHostname()); + incRPCCallsMetrics(scanMetrics, isRegionServerRemote); + if (openScannerTries > 1) { + incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); + } + openScannerTries++; CompletableFuture future = new CompletableFuture<>(); try { ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), @@ -120,7 +139,7 @@ class AsyncClientScanner { future.completeExceptionally(controller.getFailed()); return; } - future.complete(new OpenScannerResponse(loc, stub, controller, resp)); + future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); }); } catch (IOException e) { future.completeExceptionally(e); @@ -130,8 +149,9 @@ class AsyncClientScanner { private void startScan(OpenScannerResponse resp) { conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) + .remote(resp.isRegionServerRemote) .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) - .setScan(scan).consumer(consumer).resultCache(resultCache) + .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) @@ -149,6 +169,8 @@ class AsyncClientScanner { } private void openScanner() { + incRegionCountMetrics(scanMetrics); + openScannerTries = 1; conn.callerFactory. single().table(tableName).row(scan.getStartRow()) .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 08f52fc384b..d71b428fd40 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.client; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import io.netty.util.HashedWheelTimer; @@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.ipc.ProtobufRpcEngine.Server; /** * Factory to create an AsyncRpcRetryCaller. @@ -148,6 +148,8 @@ class AsyncRpcRetryingCallerFactory { private Scan scan; + private ScanMetrics scanMetrics; + private ScanResultCache resultCache; private RawScanResultConsumer consumer; @@ -156,6 +158,8 @@ class AsyncRpcRetryingCallerFactory { private HRegionLocation loc; + private boolean isRegionServerRemote; + private long scannerLeaseTimeoutPeriodNs; private long scanTimeoutNs; @@ -172,6 +176,16 @@ class AsyncRpcRetryingCallerFactory { return this; } + public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + return this; + } + + public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) { + this.isRegionServerRemote = isRegionServerRemote; + return this; + } + public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) { this.resultCache = resultCache; return this; @@ -226,11 +240,11 @@ class AsyncRpcRetryingCallerFactory { public AsyncScanSingleRegionRpcRetryingCaller build() { checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, - checkNotNull(scan, "scan is null"), scannerId, + checkNotNull(scan, "scan is null"), scanMetrics, scannerId, checkNotNull(resultCache, "resultCache is null"), checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, - scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, + pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index dd843ed547e..7ed6f031bdf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -73,6 +74,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final Scan scan; + private final ScanMetrics scanMetrics; + private final long scannerId; private final ScanResultCache resultCache; @@ -83,6 +86,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final HRegionLocation loc; + private final boolean regionServerRemote; + private final long scannerLeaseTimeoutPeriodNs; private final long pauseNs; @@ -107,7 +112,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private long nextCallStartNs; - private int tries = 1; + private int tries; private final List exceptions; @@ -279,17 +284,19 @@ class AsyncScanSingleRegionRpcRetryingCaller { } public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, - AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache, - RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, - long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { + AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId, + ScanResultCache resultCache, RawScanResultConsumer consumer, Interface stub, + HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs, + long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; + this.scanMetrics = scanMetrics; this.scannerId = scannerId; this.resultCache = resultCache; this.consumer = consumer; this.stub = stub; this.loc = loc; + this.regionServerRemote = isRegionServerRemote; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.pauseNs = pauseNs; this.maxAttempts = maxAttempts; @@ -315,6 +322,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } private void closeScanner() { + incRPCCallsMetrics(scanMetrics, regionServerRemote); resetController(controller, rpcTimeoutNs); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { @@ -345,6 +353,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } private void completeWhenError(boolean closeScanner) { + incRPCRetriesMetrics(scanMetrics, closeScanner); resultCache.clear(); if (closeScanner) { closeScanner(); @@ -449,12 +458,14 @@ class AsyncScanSingleRegionRpcRetryingCaller { onError(controller.getFailed()); return; } + updateServerSideMetrics(scanMetrics, resp); boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); Result[] results; try { + Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); + updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); results = resultCache.addAndGet( - Optional.ofNullable(ResponseConverter.getResults(controller.cellScanner(), resp)) - .orElse(ScanResultCache.EMPTY_RESULT_ARRAY), + Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), isHeartbeatMessage); } catch (IOException e) { // We can not retry here. The server has responded normally and the call sequence has been @@ -464,6 +475,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeWhenError(true); return; } + // calculate this before calling onNext as it is free for user to modify the result array in // onNext. int numberOfIndividualRows = numberOfIndividualRows(Arrays.asList(results)); @@ -510,6 +522,10 @@ class AsyncScanSingleRegionRpcRetryingCaller { } else { callTimeoutNs = 0L; } + incRPCCallsMetrics(scanMetrics, regionServerRemote); + if (tries > 1) { + incRPCRetriesMetrics(scanMetrics, regionServerRemote); + } resetController(controller, callTimeoutNs); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, nextCallSeq, false, false, scan.getLimit()); @@ -518,13 +534,14 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void next() { nextCallSeq++; - tries = 0; + tries = 1; exceptions.clear(); nextCallStartNs = System.nanoTime(); call(); } private void renewLease() { + incRPCCallsMetrics(scanMetrics, regionServerRemote); nextCallSeq++; resetController(controller, rpcTimeoutNs); ScanRequest req = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index e201ab23a11..b5a251b3e53 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -322,7 +322,14 @@ public interface AsyncTableBase { * If your result set is very large, you should use other scan method to get a scanner or use * callback to process the results. They will do chunking to prevent OOM. The scanAll method will * fetch all the results and store them in a List and then return the list to you. - * @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large + *

+ * The scan metrics will be collected background if you enable it but you have no way to get it. + * Usually you can get scan metrics from {@code ResultScanner}, or through + * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. + * So if you really care about scan metrics then you'd better use other scan methods which return + * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no + * performance difference between these scan methods so do not worry. + * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large * result set, it is likely to cause OOM. * @return The results of this small scan operation. The return value will be wrapped by a * {@link CompletableFuture}. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index f1625ad705c..29c06986319 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -162,6 +162,7 @@ class AsyncTableImpl implements AsyncTable { private void scan0(Scan scan, ScanResultConsumer consumer) { try (ResultScanner scanner = getScanner(scan)) { + consumer.onScanMetricsCreated(scanner.getScanMetrics()); for (Result result; (result = scanner.next()) != null;) { if (!consumer.onNext(result)) { break; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index eef797cde41..b6823f90b7e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -48,6 +48,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private final Queue queue = new ArrayDeque<>(); + private ScanMetrics scanMetrics; + private long cacheSize; private boolean closed = false; @@ -110,6 +112,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { notifyAll(); } + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + } + private void resumePrefetch() { if (LOG.isDebugEnabled()) { LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); @@ -168,6 +175,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { @Override public ScanMetrics getScanMetrics() { - throw new UnsupportedOperationException(); + return scanMetrics; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index a8b029ff432..8aa5c53a2ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows; import com.google.common.annotations.VisibleForTesting; @@ -250,9 +251,7 @@ public abstract class ClientScanner extends AbstractClientScanner { new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); this.callable.setCaching(this.caching); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } + incRegionCountMetrics(scanMetrics); return true; } @@ -460,7 +459,8 @@ public abstract class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. - Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); + Result[] resultsToAddToCache = + scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); if (resultsToAddToCache.length > 0) { for (Result rs : resultsToAddToCache) { cache.add(rs); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 3e7cd00c38b..f54f552523c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -47,16 +47,20 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.DNS; /** * Utility used by client connections. @@ -424,4 +428,75 @@ public final class ConnectionUtils { return new CompleteScanResultCache(); } } + + private static final String MY_ADDRESS = getMyAddress(); + + private static String getMyAddress() { + try { + return DNS.getDefaultHost("default", "default"); + } catch (UnknownHostException uhe) { + LOG.error("cannot determine my address", uhe); + return null; + } + } + + static boolean isRemote(String host) { + return !host.equalsIgnoreCase(MY_ADDRESS); + } + + static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { + if (scanMetrics == null) { + return; + } + scanMetrics.countOfRPCcalls.incrementAndGet(); + if (isRegionServerRemote) { + scanMetrics.countOfRemoteRPCcalls.incrementAndGet(); + } + } + + static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { + if (scanMetrics == null) { + return; + } + scanMetrics.countOfRPCRetries.incrementAndGet(); + if (isRegionServerRemote) { + scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); + } + } + + static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs, + boolean isRegionServerRemote) { + if (scanMetrics == null || rrs == null || rrs.length == 0) { + return; + } + long resultSize = 0; + for (Result rr : rrs) { + for (Cell cell : rr.rawCells()) { + resultSize += CellUtil.estimatedSerializedSizeOf(cell); + } + } + scanMetrics.countOfBytesInResults.addAndGet(resultSize); + if (isRegionServerRemote) { + scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize); + } + } + + /** + * Use the scan metrics returned by the server to add to the identically named counters in the + * client side metrics. If a counter does not exist with the same name as the server side metric, + * the attempt to increase the counter will fail. + */ + static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) { + if (scanMetrics == null || response == null || !response.hasScanMetrics()) { + return; + } + ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter); + } + + static void incRegionCountMetrics(ScanMetrics scanMetrics) { + if (scanMetrics == null) { + return; + } + scanMetrics.countOfRegions.incrementAndGet(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 17e0afa0c86..899c0bb6666 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; /** * Receives {@link Result} for an asynchronous scan. @@ -112,4 +113,13 @@ public interface RawScanResultConsumer { * Indicate that the scan operation is completed normally. */ void onComplete(); + + /** + * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to + * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan + * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can + * store it somewhere to get the metrics at any time if you want. + */ + default void onScanMetricsCreated(ScanMetrics scanMetrics) { + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 1d46ab480aa..538fe30a0e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -18,7 +18,8 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; import java.io.IOException; @@ -113,11 +114,8 @@ public class ReversedScannerCallable extends ScannerCallable { } // check how often we retry. - if (reload && this.scanMetrics != null) { - this.scanMetrics.countOfRPCRetries.incrementAndGet(); - if (isRegionServerRemote) { - this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); - } + if (reload) { + incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java index 770a87fb407..03b1ba09447 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; /** * Receives {@link Result} for an asynchronous scan. @@ -45,4 +46,12 @@ public interface ScanResultConsumer { */ void onComplete(); + /** + * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to + * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan + * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can + * store it somewhere to get the metrics at any time if you want. + */ + default void onScanMetricsCreated(ScanMetrics scanMetrics) { + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 0682a7ac31e..ffac566e23e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -18,17 +18,18 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; +import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; +import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; + import java.io.IOException; import java.io.InterruptedIOException; -import java.net.UnknownHostException; -import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionInfo; @@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.net.DNS; /** * Scanner operations such as create, next, etc. @@ -72,7 +72,6 @@ public class ScannerCallable extends ClientServiceCallable { protected ScanMetrics scanMetrics; private boolean logScannerActivity = false; private int logCutOffLatency = 1000; - private static String myAddress; protected final int id; enum MoreResults { @@ -87,13 +86,6 @@ public class ScannerCallable extends ClientServiceCallable { * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} */ protected boolean heartbeatMessage = false; - static { - try { - myAddress = DNS.getDefaultHost("default", "default"); - } catch (UnknownHostException uhe) { - LOG.error("cannot determine my address", uhe); - } - } // indicate if it is a remote server call protected boolean isRegionServerRemote = true; @@ -158,30 +150,23 @@ public class ScannerCallable extends ClientServiceCallable { } // check how often we retry. - if (reload && this.scanMetrics != null) { - this.scanMetrics.countOfRPCRetries.incrementAndGet(); - if (isRegionServerRemote) { - this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); - } + if (reload) { + incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); } } /** - * compare the local machine hostname with region server's hostname - * to decide if hbase client connects to a remote region server + * compare the local machine hostname with region server's hostname to decide if hbase client + * connects to a remote region server */ protected void checkIfRegionServerIsRemote() { - if (getLocation().getHostname().equalsIgnoreCase(myAddress)) { - isRegionServerRemote = false; - } else { - isRegionServerRemote = true; - } + isRegionServerRemote = isRemote(getLocation().getHostname()); } private ScanResponse next() throws IOException { // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server setHeartbeatMessage(false); - incRPCcallsMetrics(); + incRPCCallsMetrics(scanMetrics, isRegionServerRemote); ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew, scan.getLimit()); try { @@ -267,7 +252,7 @@ public class ScannerCallable extends ClientServiceCallable { + scannerId); } } - updateServerSideMetrics(response); + updateServerSideMetrics(scanMetrics, response); // moreResults is only used for the case where a filter exhausts all elements if (response.hasMoreResults()) { if (response.getMoreResults()) { @@ -289,7 +274,7 @@ public class ScannerCallable extends ClientServiceCallable { } else { setMoreResultsInRegion(MoreResults.UNKNOWN); } - updateResultsMetrics(rrs); + updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); return rrs; } @@ -307,53 +292,12 @@ public class ScannerCallable extends ClientServiceCallable { this.heartbeatMessage = heartbeatMessage; } - private void incRPCcallsMetrics() { - if (this.scanMetrics == null) { - return; - } - this.scanMetrics.countOfRPCcalls.incrementAndGet(); - if (isRegionServerRemote) { - this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet(); - } - } - - protected void updateResultsMetrics(Result[] rrs) { - if (this.scanMetrics == null || rrs == null || rrs.length == 0) { - return; - } - long resultSize = 0; - for (Result rr : rrs) { - for (Cell cell : rr.rawCells()) { - resultSize += CellUtil.estimatedSerializedSizeOf(cell); - } - } - this.scanMetrics.countOfBytesInResults.addAndGet(resultSize); - if (isRegionServerRemote) { - this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize); - } - } - - /** - * Use the scan metrics returned by the server to add to the identically named counters in the - * client side metrics. If a counter does not exist with the same name as the server side metric, - * the attempt to increase the counter will fail. - * @param response - */ - private void updateServerSideMetrics(ScanResponse response) { - if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return; - - Map serverMetrics = ResponseConverter.getScanMetrics(response); - for (Entry entry : serverMetrics.entrySet()) { - this.scanMetrics.addToCounter(entry.getKey(), entry.getValue()); - } - } - private void close() { if (this.scannerId == -1L) { return; } try { - incRPCcallsMetrics(); + incRPCCallsMetrics(scanMetrics, isRegionServerRemote); ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { @@ -371,7 +315,7 @@ public class ScannerCallable extends ClientServiceCallable { } private ScanResponse openScanner() throws IOException { - incRPCcallsMetrics(); + incRPCCallsMetrics(scanMetrics, isRegionServerRemote); ScanRequest request = RequestConverter.buildScanRequest( getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java new file mode 100644 index 00000000000..026a21f89e4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; + +class SimpleRawScanResultConsumer implements RawScanResultConsumer { + + private ScanMetrics scanMetrics; + + private final Queue queue = new ArrayDeque<>(); + + private boolean finished; + + private Throwable error; + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + } + + @Override + public synchronized void onNext(Result[] results, ScanController controller) { + for (Result result : results) { + queue.offer(result); + } + notifyAll(); + } + + @Override + public synchronized void onError(Throwable error) { + finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized Result take() throws IOException, InterruptedException { + for (;;) { + if (!queue.isEmpty()) { + return queue.poll(); + } + if (finished) { + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } else { + return null; + } + } + wait(); + } + } + + public ScanMetrics getScanMetrics() { + return scanMetrics; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java new file mode 100644 index 00000000000..168129d53d8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.base.Throwables; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; + +final class SimpleScanResultConsumer implements ScanResultConsumer { + + private ScanMetrics scanMetrics; + + private final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + } + + @Override + public synchronized boolean onNext(Result result) { + results.add(result); + return true; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + finished = true; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized List getAll() throws Exception { + while (!finished) { + wait(); + } + if (error != null) { + Throwables.propagateIfPossible(error, Exception.class); + throw new Exception(error); + } + return results; + } + + public ScanMetrics getScanMetrics() { + return scanMetrics; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index a8aad0b09d3..2e645933cf0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.base.Throwables; - -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; @@ -37,45 +34,6 @@ import org.junit.runners.Parameterized.Parameters; @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScan extends AbstractTestAsyncTableScan { - private static final class SimpleScanResultConsumer implements ScanResultConsumer { - - private final List results = new ArrayList<>(); - - private Throwable error; - - private boolean finished = false; - - @Override - public synchronized boolean onNext(Result result) { - results.add(result); - return true; - } - - @Override - public synchronized void onError(Throwable error) { - this.error = error; - finished = true; - notifyAll(); - } - - @Override - public synchronized void onComplete() { - finished = true; - notifyAll(); - } - - public synchronized List getAll() throws Exception { - while (!finished) { - wait(); - } - if (error != null) { - Throwables.propagateIfPossible(error, Exception.class); - throw new Exception(error); - } - return results; - } - } - @Parameter(0) public String scanType; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java new file mode 100644 index 00000000000..b877dace916 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ForkJoinPool; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScanMetrics { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + private static final byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection CONN; + + private static int NUM_REGIONS; + + @FunctionalInterface + private interface ScanWithMetrics { + Pair, ScanMetrics> scan(Scan scan) throws Exception; + } + + @Parameter(0) + public String methodName; + + @Parameter(1) + public ScanWithMetrics method; + + @Parameters(name = "{index}: scan={0}") + public static List params() { + ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable; + ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan; + ScanWithMetrics doScanWithAsyncTableScanner = + TestAsyncTableScanMetrics::doScanWithAsyncTableScanner; + return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable }, + new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan }, + new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner }); + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(3); + // Create 3 rows in the table, with rowkeys starting with "zzz*" so that + // scan are forced to hit all the regions. + try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { + table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE))); + } + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); + } + + @AfterClass + public static void tearDown() throws Exception { + IOUtils.closeQuietly(CONN); + UTIL.shutdownMiniCluster(); + } + + private static Pair, ScanMetrics> doScanWithRawAsyncTable(Scan scan) + throws IOException, InterruptedException { + SimpleRawScanResultConsumer consumer = new SimpleRawScanResultConsumer(); + CONN.getRawTable(TABLE_NAME).scan(scan, consumer); + List results = new ArrayList<>(); + for (Result result; (result = consumer.take()) != null;) { + results.add(result); + } + return Pair.newPair(results, consumer.getScanMetrics()); + } + + private static Pair, ScanMetrics> doScanWithAsyncTableScan(Scan scan) + throws Exception { + SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer); + return Pair.newPair(consumer.getAll(), consumer.getScanMetrics()); + } + + private static Pair, ScanMetrics> doScanWithAsyncTableScanner(Scan scan) + throws IOException { + try (ResultScanner scanner = + CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) { + List results = new ArrayList<>(); + for (Result result; (result = scanner.next()) != null;) { + results.add(result); + } + return Pair.newPair(results, scanner.getScanMetrics()); + } + } + + @Test + public void testNoScanMetrics() throws Exception { + Pair, ScanMetrics> pair = method.scan(new Scan()); + assertEquals(3, pair.getFirst().size()); + assertNull(pair.getSecond()); + } + + @Test + public void testScanMetrics() throws Exception { + Pair, ScanMetrics> pair = method.scan(new Scan().setScanMetricsEnabled(true)); + List results = pair.getFirst(); + assertEquals(3, results.size()); + long bytes = results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream()) + .mapToLong(c -> CellUtil.estimatedSerializedSizeOf(c)).sum(); + ScanMetrics scanMetrics = pair.getSecond(); + assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); + assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); + assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get()); + // also assert a server side metric to ensure that we have published them into the client side + // metrics. + assertEquals(3, scanMetrics.countOfRowsScanned.get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 72179c8eb96..5311ca21bf5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -17,13 +17,8 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.base.Throwables; - -import java.io.IOException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; -import java.util.Queue; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -39,53 +34,6 @@ import org.junit.runners.Parameterized.Parameters; @Category({ MediumTests.class, ClientTests.class }) public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { - private static final class SimpleRawScanResultConsumer implements RawScanResultConsumer { - - private final Queue queue = new ArrayDeque<>(); - - private boolean finished; - - private Throwable error; - - @Override - public synchronized void onNext(Result[] results, ScanController controller) { - for (Result result : results) { - queue.offer(result); - } - notifyAll(); - } - - @Override - public synchronized void onError(Throwable error) { - finished = true; - this.error = error; - notifyAll(); - } - - @Override - public synchronized void onComplete() { - finished = true; - notifyAll(); - } - - public synchronized Result take() throws IOException, InterruptedException { - for (;;) { - if (!queue.isEmpty()) { - return queue.poll(); - } - if (finished) { - if (error != null) { - Throwables.propagateIfPossible(error, IOException.class); - throw new IOException(error); - } else { - return null; - } - } - wait(); - } - } - } - @Parameter(0) public String scanType;