HBASE-17691 Add ScanMetrics support for async scan

This commit is contained in:
zhangduo 2017-03-20 17:12:53 +08:00
parent 7c03a213ff
commit 5b4bb8217d
17 changed files with 526 additions and 198 deletions

View File

@ -19,8 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@ -29,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@ -51,6 +51,8 @@ class AsyncClientScanner {
// AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
private final Scan scan;
private final ScanMetrics scanMetrics;
private final RawScanResultConsumer consumer;
private final TableName tableName;
@ -88,29 +90,46 @@ class AsyncClientScanner {
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = createScanResultCache(scan);
if (scan.isScanMetricsEnabled()) {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
} else {
this.scanMetrics = null;
}
}
private static final class OpenScannerResponse {
public final HRegionLocation loc;
public final boolean isRegionServerRemote;
public final ClientService.Interface stub;
public final HBaseRpcController controller;
public final ScanResponse resp;
public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller,
ScanResponse resp) {
public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
HBaseRpcController controller, ScanResponse resp) {
this.loc = loc;
this.isRegionServerRemote = isRegionServerRemote;
this.stub = stub;
this.controller = controller;
this.resp = resp;
}
}
private int openScannerTries;
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
openScannerTries++;
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
@ -120,7 +139,7 @@ class AsyncClientScanner {
future.completeExceptionally(controller.getFailed());
return;
}
future.complete(new OpenScannerResponse(loc, stub, controller, resp));
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
@ -130,8 +149,9 @@ class AsyncClientScanner {
private void startScan(OpenScannerResponse resp) {
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).consumer(consumer).resultCache(resultCache)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
@ -149,6 +169,8 @@ class AsyncClientScanner {
}
private void openScanner() {
incRegionCountMetrics(scanMetrics);
openScannerTries = 1;
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.client;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import io.netty.util.HashedWheelTimer;
@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
/**
* Factory to create an AsyncRpcRetryCaller.
@ -148,6 +148,8 @@ class AsyncRpcRetryingCallerFactory {
private Scan scan;
private ScanMetrics scanMetrics;
private ScanResultCache resultCache;
private RawScanResultConsumer consumer;
@ -156,6 +158,8 @@ class AsyncRpcRetryingCallerFactory {
private HRegionLocation loc;
private boolean isRegionServerRemote;
private long scannerLeaseTimeoutPeriodNs;
private long scanTimeoutNs;
@ -172,6 +176,16 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
return this;
}
public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
this.isRegionServerRemote = isRegionServerRemote;
return this;
}
public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
this.resultCache = resultCache;
return this;
@ -226,11 +240,11 @@ class AsyncRpcRetryingCallerFactory {
public AsyncScanSingleRegionRpcRetryingCaller build() {
checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
checkNotNull(scan, "scan is null"), scannerId,
checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts,
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@ -73,6 +74,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final Scan scan;
private final ScanMetrics scanMetrics;
private final long scannerId;
private final ScanResultCache resultCache;
@ -83,6 +86,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final HRegionLocation loc;
private final boolean regionServerRemote;
private final long scannerLeaseTimeoutPeriodNs;
private final long pauseNs;
@ -107,7 +112,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private long nextCallStartNs;
private int tries = 1;
private int tries;
private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
@ -279,17 +284,19 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
RawScanResultConsumer consumer, Interface stub, HRegionLocation loc,
long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
ScanResultCache resultCache, RawScanResultConsumer consumer, Interface stub,
HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
this.scanMetrics = scanMetrics;
this.scannerId = scannerId;
this.resultCache = resultCache;
this.consumer = consumer;
this.stub = stub;
this.loc = loc;
this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
@ -315,6 +322,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs);
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
@ -345,6 +353,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
private void completeWhenError(boolean closeScanner) {
incRPCRetriesMetrics(scanMetrics, closeScanner);
resultCache.clear();
if (closeScanner) {
closeScanner();
@ -449,12 +458,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
onError(controller.getFailed());
return;
}
updateServerSideMetrics(scanMetrics, resp);
boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
Result[] results;
try {
Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
results = resultCache.addAndGet(
Optional.ofNullable(ResponseConverter.getResults(controller.cellScanner(), resp))
.orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
isHeartbeatMessage);
} catch (IOException e) {
// We can not retry here. The server has responded normally and the call sequence has been
@ -464,6 +475,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeWhenError(true);
return;
}
// calculate this before calling onNext as it is free for user to modify the result array in
// onNext.
int numberOfIndividualRows = numberOfIndividualRows(Arrays.asList(results));
@ -510,6 +522,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
} else {
callTimeoutNs = 0L;
}
incRPCCallsMetrics(scanMetrics, regionServerRemote);
if (tries > 1) {
incRPCRetriesMetrics(scanMetrics, regionServerRemote);
}
resetController(controller, callTimeoutNs);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, false, false, scan.getLimit());
@ -518,13 +534,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void next() {
nextCallSeq++;
tries = 0;
tries = 1;
exceptions.clear();
nextCallStartNs = System.nanoTime();
call();
}
private void renewLease() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
nextCallSeq++;
resetController(controller, rpcTimeoutNs);
ScanRequest req =

View File

@ -322,7 +322,14 @@ public interface AsyncTableBase {
* If your result set is very large, you should use other scan method to get a scanner or use
* callback to process the results. They will do chunking to prevent OOM. The scanAll method will
* fetch all the results and store them in a List and then return the list to you.
* @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large
* <p>
* The scan metrics will be collected background if you enable it but you have no way to get it.
* Usually you can get scan metrics from {@code ResultScanner}, or through
* {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
* So if you really care about scan metrics then you'd better use other scan methods which return
* a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
* performance difference between these scan methods so do not worry.
* @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
* result set, it is likely to cause OOM.
* @return The results of this small scan operation. The return value will be wrapped by a
* {@link CompletableFuture}.

View File

@ -162,6 +162,7 @@ class AsyncTableImpl implements AsyncTable {
private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;

View File

@ -48,6 +48,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private final Queue<Result> queue = new ArrayDeque<>();
private ScanMetrics scanMetrics;
private long cacheSize;
private boolean closed = false;
@ -110,6 +112,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
notifyAll();
}
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
private void resumePrefetch() {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
@ -168,6 +175,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
@Override
public ScanMetrics getScanMetrics() {
throw new UnsupportedOperationException();
return scanMetrics;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
import com.google.common.annotations.VisibleForTesting;
@ -250,9 +251,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
this.callable.setCaching(this.caching);
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
incRegionCountMetrics(scanMetrics);
return true;
}
@ -460,7 +459,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
Result[] resultsToAddToCache =
scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
if (resultsToAddToCache.length > 0) {
for (Result rs : resultsToAddToCache) {
cache.add(rs);

View File

@ -47,16 +47,20 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
/**
* Utility used by client connections.
@ -424,4 +428,75 @@ public final class ConnectionUtils {
return new CompleteScanResultCache();
}
}
private static final String MY_ADDRESS = getMyAddress();
private static String getMyAddress() {
try {
return DNS.getDefaultHost("default", "default");
} catch (UnknownHostException uhe) {
LOG.error("cannot determine my address", uhe);
return null;
}
}
static boolean isRemote(String host) {
return !host.equalsIgnoreCase(MY_ADDRESS);
}
static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
if (scanMetrics == null) {
return;
}
scanMetrics.countOfRPCcalls.incrementAndGet();
if (isRegionServerRemote) {
scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
}
}
static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
if (scanMetrics == null) {
return;
}
scanMetrics.countOfRPCRetries.incrementAndGet();
if (isRegionServerRemote) {
scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
}
}
static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs,
boolean isRegionServerRemote) {
if (scanMetrics == null || rrs == null || rrs.length == 0) {
return;
}
long resultSize = 0;
for (Result rr : rrs) {
for (Cell cell : rr.rawCells()) {
resultSize += CellUtil.estimatedSerializedSizeOf(cell);
}
}
scanMetrics.countOfBytesInResults.addAndGet(resultSize);
if (isRegionServerRemote) {
scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
}
}
/**
* Use the scan metrics returned by the server to add to the identically named counters in the
* client side metrics. If a counter does not exist with the same name as the server side metric,
* the attempt to increase the counter will fail.
*/
static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) {
if (scanMetrics == null || response == null || !response.hasScanMetrics()) {
return;
}
ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter);
}
static void incRegionCountMetrics(ScanMetrics scanMetrics) {
if (scanMetrics == null) {
return;
}
scanMetrics.countOfRegions.incrementAndGet();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
/**
* Receives {@link Result} for an asynchronous scan.
@ -112,4 +113,13 @@ public interface RawScanResultConsumer {
* Indicate that the scan operation is completed normally.
*/
void onComplete();
/**
* If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
* all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
* operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
* store it somewhere to get the metrics at any time if you want.
*/
default void onScanMetricsCreated(ScanMetrics scanMetrics) {
}
}

View File

@ -18,7 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import java.io.IOException;
@ -113,11 +114,8 @@ public class ReversedScannerCallable extends ScannerCallable {
}
// check how often we retry.
if (reload && this.scanMetrics != null) {
this.scanMetrics.countOfRPCRetries.incrementAndGet();
if (isRegionServerRemote) {
this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
}
if (reload) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
/**
* Receives {@link Result} for an asynchronous scan.
@ -45,4 +46,12 @@ public interface ScanResultConsumer {
*/
void onComplete();
/**
* If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
* all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
* operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
* store it somewhere to get the metrics at any time if you want.
*/
default void onScanMetricsCreated(ScanMetrics scanMetrics) {
}
}

View File

@ -18,17 +18,18 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.net.DNS;
/**
* Scanner operations such as create, next, etc.
@ -72,7 +72,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
protected ScanMetrics scanMetrics;
private boolean logScannerActivity = false;
private int logCutOffLatency = 1000;
private static String myAddress;
protected final int id;
enum MoreResults {
@ -87,13 +86,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
* Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
*/
protected boolean heartbeatMessage = false;
static {
try {
myAddress = DNS.getDefaultHost("default", "default");
} catch (UnknownHostException uhe) {
LOG.error("cannot determine my address", uhe);
}
}
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
@ -158,30 +150,23 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
}
// check how often we retry.
if (reload && this.scanMetrics != null) {
this.scanMetrics.countOfRPCRetries.incrementAndGet();
if (isRegionServerRemote) {
this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
}
if (reload) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
}
/**
* compare the local machine hostname with region server's hostname
* to decide if hbase client connects to a remote region server
* compare the local machine hostname with region server's hostname to decide if hbase client
* connects to a remote region server
*/
protected void checkIfRegionServerIsRemote() {
if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
isRegionServerRemote = false;
} else {
isRegionServerRemote = true;
}
isRegionServerRemote = isRemote(getLocation().getHostname());
}
private ScanResponse next() throws IOException {
// Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
setHeartbeatMessage(false);
incRPCcallsMetrics();
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null, renew, scan.getLimit());
try {
@ -267,7 +252,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
+ scannerId);
}
}
updateServerSideMetrics(response);
updateServerSideMetrics(scanMetrics, response);
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults()) {
if (response.getMoreResults()) {
@ -289,7 +274,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
} else {
setMoreResultsInRegion(MoreResults.UNKNOWN);
}
updateResultsMetrics(rrs);
updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
return rrs;
}
@ -307,53 +292,12 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
this.heartbeatMessage = heartbeatMessage;
}
private void incRPCcallsMetrics() {
if (this.scanMetrics == null) {
return;
}
this.scanMetrics.countOfRPCcalls.incrementAndGet();
if (isRegionServerRemote) {
this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
}
}
protected void updateResultsMetrics(Result[] rrs) {
if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
return;
}
long resultSize = 0;
for (Result rr : rrs) {
for (Cell cell : rr.rawCells()) {
resultSize += CellUtil.estimatedSerializedSizeOf(cell);
}
}
this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
if (isRegionServerRemote) {
this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
}
}
/**
* Use the scan metrics returned by the server to add to the identically named counters in the
* client side metrics. If a counter does not exist with the same name as the server side metric,
* the attempt to increase the counter will fail.
* @param response
*/
private void updateServerSideMetrics(ScanResponse response) {
if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
for (Entry<String, Long> entry : serverMetrics.entrySet()) {
this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
}
}
private void close() {
if (this.scannerId == -1L) {
return;
}
try {
incRPCcallsMetrics();
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
try {
@ -371,7 +315,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
}
private ScanResponse openScanner() throws IOException {
incRPCcallsMetrics();
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
ScanRequest request = RequestConverter.buildScanRequest(
getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
try {

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
class SimpleRawScanResultConsumer implements RawScanResultConsumer {
private ScanMetrics scanMetrics;
private final Queue<Result> queue = new ArrayDeque<>();
private boolean finished;
private Throwable error;
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public synchronized void onNext(Result[] results, ScanController controller) {
for (Result result : results) {
queue.offer(result);
}
notifyAll();
}
@Override
public synchronized void onError(Throwable error) {
finished = true;
this.error = error;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized Result take() throws IOException, InterruptedException {
for (;;) {
if (!queue.isEmpty()) {
return queue.poll();
}
if (finished) {
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
} else {
return null;
}
}
wait();
}
}
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
final class SimpleScanResultConsumer implements ScanResultConsumer {
private ScanMetrics scanMetrics;
private final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return true;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
}

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
@ -37,45 +34,6 @@ import org.junit.runners.Parameterized.Parameters;
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
private static final class SimpleScanResultConsumer implements ScanResultConsumer {
private final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return true;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
}
@Parameter(0)
public String scanType;

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableScanMetrics {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics");
private static final byte[] CF = Bytes.toBytes("cf");
private static final byte[] CQ = Bytes.toBytes("cq");
private static final byte[] VALUE = Bytes.toBytes("value");
private static AsyncConnection CONN;
private static int NUM_REGIONS;
@FunctionalInterface
private interface ScanWithMetrics {
Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception;
}
@Parameter(0)
public String methodName;
@Parameter(1)
public ScanWithMetrics method;
@Parameters(name = "{index}: scan={0}")
public static List<Object[]> params() {
ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable;
ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan;
ScanWithMetrics doScanWithAsyncTableScanner =
TestAsyncTableScanMetrics::doScanWithAsyncTableScanner;
return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable },
new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan },
new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner });
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
// Create 3 rows in the table, with rowkeys starting with "zzz*" so that
// scan are forced to hit all the regions.
try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE),
new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE),
new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE)));
}
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
}
@AfterClass
public static void tearDown() throws Exception {
IOUtils.closeQuietly(CONN);
UTIL.shutdownMiniCluster();
}
private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan)
throws IOException, InterruptedException {
SimpleRawScanResultConsumer consumer = new SimpleRawScanResultConsumer();
CONN.getRawTable(TABLE_NAME).scan(scan, consumer);
List<Result> results = new ArrayList<>();
for (Result result; (result = consumer.take()) != null;) {
results.add(result);
}
return Pair.newPair(results, consumer.getScanMetrics());
}
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
throws Exception {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
}
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan)
throws IOException {
try (ResultScanner scanner =
CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) {
List<Result> results = new ArrayList<>();
for (Result result; (result = scanner.next()) != null;) {
results.add(result);
}
return Pair.newPair(results, scanner.getScanMetrics());
}
}
@Test
public void testNoScanMetrics() throws Exception {
Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
assertEquals(3, pair.getFirst().size());
assertNull(pair.getSecond());
}
@Test
public void testScanMetrics() throws Exception {
Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan().setScanMetricsEnabled(true));
List<Result> results = pair.getFirst();
assertEquals(3, results.size());
long bytes = results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
.mapToLong(c -> CellUtil.estimatedSerializedSizeOf(c)).sum();
ScanMetrics scanMetrics = pair.getSecond();
assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
// also assert a server side metric to ensure that we have published them into the client side
// metrics.
assertEquals(3, scanMetrics.countOfRowsScanned.get());
}
}

View File

@ -17,13 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -39,53 +34,6 @@ import org.junit.runners.Parameterized.Parameters;
@Category({ MediumTests.class, ClientTests.class })
public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
private static final class SimpleRawScanResultConsumer implements RawScanResultConsumer {
private final Queue<Result> queue = new ArrayDeque<>();
private boolean finished;
private Throwable error;
@Override
public synchronized void onNext(Result[] results, ScanController controller) {
for (Result result : results) {
queue.offer(result);
}
notifyAll();
}
@Override
public synchronized void onError(Throwable error) {
finished = true;
this.error = error;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized Result take() throws IOException, InterruptedException {
for (;;) {
if (!queue.isEmpty()) {
return queue.poll();
}
if (finished) {
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
} else {
return null;
}
}
wait();
}
}
}
@Parameter(0)
public String scanType;