HBASE-22244 Make use of MetricsConnection in async client

This commit is contained in:
zhangduo 2019-04-17 21:45:54 +08:00 committed by Apache9
parent f4aaf735e4
commit a3d2a2df3a
10 changed files with 95 additions and 45 deletions

View File

@ -197,10 +197,9 @@ class AsyncClientScanner {
private void openScanner() {
incRegionCountMetrics(scanMetrics);
openScannerTries.set(1);
addListener(
timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer),
(resp, error) -> {
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;

View File

@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -103,6 +105,8 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile boolean closed = false;
private final Optional<MetricsConnection> metrics;
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
User user) {
this.conf = conf;
@ -112,7 +116,12 @@ class AsyncConnectionImpl implements AsyncConnection {
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
@ -148,6 +157,7 @@ class AsyncConnectionImpl implements AsyncConnection {
if (authService != null) {
authService.shutdown();
}
metrics.ifPresent(MetricsConnection::shutdown);
closed = true;
}
@ -312,4 +322,8 @@ class AsyncConnectionImpl implements AsyncConnection {
public void clearRegionLocationCache() {
locator.clearCache();
}
Optional<MetricsConnection> getConnectionMetrics() {
return metrics;
}
}

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionLocation;
@ -106,7 +107,7 @@ class AsyncMetaRegionLocator {
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
this::addLocationToCache, this::removeLocationFromCache);
this::addLocationToCache, this::removeLocationFromCache, Optional.empty());
}
void clearCache() {

View File

@ -338,15 +338,25 @@ class AsyncNonMetaRegionLocator {
return true;
}
private void recordCacheHit() {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
}
private void recordCacheMiss() {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
}
private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId) {
Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
if (entry == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
recordCacheMiss();
return null;
}
byte[] endKey = loc.getRegion().getEndKey();
@ -355,8 +365,10 @@ class AsyncNonMetaRegionLocator {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
}
}
@ -367,11 +379,13 @@ class AsyncNonMetaRegionLocator {
Map.Entry<byte[], RegionLocations> entry =
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
if (entry == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
recordCacheMiss();
return null;
}
if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
@ -380,8 +394,10 @@ class AsyncNonMetaRegionLocator {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
}
}
@ -529,6 +545,10 @@ class AsyncNonMetaRegionLocator {
return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
}
private void recordClearRegionCache() {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
}
private void removeLocationFromCache(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
@ -544,10 +564,12 @@ class AsyncNonMetaRegionLocator {
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
if (newLocs == null) {
if (tableCache.cache.remove(startKey, oldLocs)) {
recordClearRegionCache();
return;
}
} else {
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
recordClearRegionCache();
return;
}
}
@ -569,7 +591,7 @@ class AsyncNonMetaRegionLocator {
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
this::addLocationToCache, this::removeLocationFromCache);
this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
}
void clearCache(TableName tableName) {
@ -583,6 +605,8 @@ class AsyncNonMetaRegionLocator {
tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
}
}
conn.getConnectionMetrics()
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
}
void clearCache() {

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
@ -51,7 +52,8 @@ final class AsyncRegionLocatorHelper {
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
Optional<MetricsConnection> metrics) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@ -78,6 +80,7 @@ final class AsyncRegionLocatorHelper {
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
removeFromCache.accept(loc);
}
}

View File

@ -190,10 +190,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// thread executor shared by all Table instances created
// by this connection
private volatile ExecutorService batchPool = null;
private volatile ThreadPoolExecutor batchPool = null;
// meta thread executor shared by all Table instances created
// by this connection
private volatile ExecutorService metaLookupPool = null;
private volatile ThreadPoolExecutor metaLookupPool = null;
private volatile boolean cleanupPool = false;
private final Configuration conf;
@ -238,14 +238,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* constructor
* @param conf Configuration object
*/
ConnectionImplementation(Configuration conf,
ExecutorService pool, User user) throws IOException {
ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
this.conf = conf;
this.user = user;
if (user != null && user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
}
this.batchPool = pool;
this.batchPool = (ThreadPoolExecutor) pool;
this.connectionConfig = new ConnectionConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
@ -286,7 +285,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = new MetricsConnection(this);
this.metrics =
new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
} else {
this.metrics = null;
}
@ -461,7 +461,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return this.metrics;
}
private ExecutorService getBatchPool() {
private ThreadPoolExecutor getBatchPool() {
if (batchPool == null) {
synchronized (this) {
if (batchPool == null) {
@ -474,7 +474,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return this.batchPool;
}
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint,
BlockingQueue<Runnable> passedWorkQueue) {
// shared HTable thread executor not yet initialized
if (maxThreads == 0) {
@ -503,7 +503,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return tpe;
}
private ExecutorService getMetaLookupPool() {
private ThreadPoolExecutor getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {

View File

@ -28,6 +28,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@ -500,13 +501,19 @@ public final class ConnectionUtils {
/**
* Connect the two futures, if the src future is done, then mark the dst future as done. And if
* the dst future is done, then cancel the src future. This is used for timeline consistent read.
* <p/>
* Pass empty metrics if you want to link the primary future and the dst future so we will not
* increase the hedge read related metrics.
*/
private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture,
Optional<MetricsConnection> metrics) {
addListener(srcFuture, (r, e) -> {
if (e != null) {
dstFuture.completeExceptionally(e);
} else {
dstFuture.complete(r);
if (dstFuture.complete(r)) {
metrics.ifPresent(MetricsConnection::incrHedgedReadWin);
}
}
});
// The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
@ -519,7 +526,7 @@ public final class ConnectionUtils {
private static <T> void sendRequestsToSecondaryReplicas(
Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs,
CompletableFuture<T> future) {
CompletableFuture<T> future, Optional<MetricsConnection> metrics) {
if (future.isDone()) {
// do not send requests to secondary replicas if the future is done, i.e, the primary request
// has already been finished.
@ -527,14 +534,15 @@ public final class ConnectionUtils {
}
for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
connect(secondaryFuture, future);
metrics.ifPresent(MetricsConnection::incrHedgedReadOps);
connect(secondaryFuture, future, metrics);
}
}
static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator,
TableName tableName, Query query, byte[] row, RegionLocateType locateType,
Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs,
long primaryCallTimeoutNs, Timer retryTimer) {
long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) {
if (query.getConsistency() == Consistency.STRONG) {
return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@ -545,7 +553,7 @@ public final class ConnectionUtils {
// Timeline consistent read, where we may send requests to other region replicas
CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
CompletableFuture<T> future = new CompletableFuture<>();
connect(primaryFuture, future);
connect(primaryFuture, future, Optional.empty());
long startNs = System.nanoTime();
// after the getRegionLocations, all the locations for the replicas of this region should have
// been cached, so it is not big deal to locate them again when actually sending requests to
@ -567,11 +575,11 @@ public final class ConnectionUtils {
}
long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
if (delayNs <= 0) {
sendRequestsToSecondaryReplicas(requestReplica, locs, future);
sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics);
} else {
retryTimer.newTimeout(
timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs,
TimeUnit.NANOSECONDS);
timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics),
delayNs, TimeUnit.NANOSECONDS);
}
});
return future;

View File

@ -33,7 +33,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
@ -305,30 +305,30 @@ public class MetricsConnection implements StatisticTrackable {
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
MetricsConnection(final ConnectionImplementation conn) {
this.scope = conn.toString();
MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
this.scope = scope;
this.registry = new MetricRegistry();
this.registry.register(getExecutorPoolName(),
new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
if (batchPool == null) {
ThreadPoolExecutor pool = batchPool.get();
if (pool == null) {
return Ratio.of(0, 0);
}
return Ratio.of(batchPool.getActiveCount(), batchPool.getMaximumPoolSize());
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
}
});
this.registry.register(getMetaPoolName(),
new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
if (metaPool == null) {
ThreadPoolExecutor pool = metaPool.get();
if (pool == null) {
return Ratio.of(0, 0);
}
return Ratio.of(metaPool.getActiveCount(), metaPool.getMaximumPoolSize());
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
}
});
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
@ -401,6 +401,11 @@ public class MetricsConnection implements StatisticTrackable {
metaCacheNumClearRegion.inc();
}
/** Increment the number of meta cache drops requested for individual region. */
public void incrMetaCacheNumClearRegion(int count) {
metaCacheNumClearRegion.inc(count);
}
/** Increment the number of hedged read that have occurred. */
public void incrHedgedReadOps() {
hedgedReadOps.inc();

View File

@ -232,7 +232,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Result> get(Get get) {
return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer);
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics());
}
@Override

View File

@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.RatioGauge.Ratio;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MetricsTests;
@ -35,7 +34,6 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
@ -57,13 +55,11 @@ public class TestMetricsConnection {
HBaseClassTestRule.forClass(TestMetricsConnection.class);
private static MetricsConnection METRICS;
private static final ExecutorService BATCH_POOL = Executors.newFixedThreadPool(2);
private static final ThreadPoolExecutor BATCH_POOL =
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
@BeforeClass
public static void beforeClass() {
ConnectionImplementation mocked = Mockito.mock(ConnectionImplementation.class);
Mockito.when(mocked.toString()).thenReturn("mocked-connection");
Mockito.when(mocked.getCurrentBatchPool()).thenReturn(BATCH_POOL);
METRICS = new MetricsConnection(mocked);
METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () -> null);
}
@AfterClass