HBASE-22244 Make use of MetricsConnection in async client
This commit is contained in:
parent
22fca940be
commit
cbe4ec2c51
|
@ -197,10 +197,9 @@ class AsyncClientScanner {
|
|||
private void openScanner() {
|
||||
incRegionCountMetrics(scanMetrics);
|
||||
openScannerTries.set(1);
|
||||
addListener(
|
||||
timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
|
||||
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer),
|
||||
(resp, error) -> {
|
||||
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
|
||||
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
|
||||
conn.getConnectionMetrics()), (resp, error) -> {
|
||||
if (error != null) {
|
||||
consumer.onError(error);
|
||||
return;
|
||||
|
|
|
@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
|
||||
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -103,6 +105,8 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
private final Optional<MetricsConnection> metrics;
|
||||
|
||||
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
|
||||
User user) {
|
||||
this.conf = conf;
|
||||
|
@ -112,7 +116,12 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
}
|
||||
this.connConf = new AsyncConnectionConfiguration(conf);
|
||||
this.registry = registry;
|
||||
this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
|
||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||
this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
|
||||
} else {
|
||||
this.metrics = Optional.empty();
|
||||
}
|
||||
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
|
||||
this.rpcTimeout =
|
||||
|
@ -148,6 +157,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
if (authService != null) {
|
||||
authService.shutdown();
|
||||
}
|
||||
metrics.ifPresent(MetricsConnection::shutdown);
|
||||
closed = true;
|
||||
}
|
||||
|
||||
|
@ -312,4 +322,8 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
public void clearRegionLocationCache() {
|
||||
locator.clearCache();
|
||||
}
|
||||
|
||||
Optional<MetricsConnection> getConnectionMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
|
|||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
|
||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -106,7 +107,7 @@ class AsyncMetaRegionLocator {
|
|||
|
||||
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
|
||||
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
|
||||
this::addLocationToCache, this::removeLocationFromCache);
|
||||
this::addLocationToCache, this::removeLocationFromCache, Optional.empty());
|
||||
}
|
||||
|
||||
void clearCache() {
|
||||
|
|
|
@ -338,15 +338,25 @@ class AsyncNonMetaRegionLocator {
|
|||
return true;
|
||||
}
|
||||
|
||||
private void recordCacheHit() {
|
||||
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
|
||||
}
|
||||
|
||||
private void recordCacheMiss() {
|
||||
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
|
||||
}
|
||||
|
||||
private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
|
||||
int replicaId) {
|
||||
Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
|
||||
if (entry == null) {
|
||||
recordCacheMiss();
|
||||
return null;
|
||||
}
|
||||
RegionLocations locs = entry.getValue();
|
||||
HRegionLocation loc = locs.getRegionLocation(replicaId);
|
||||
if (loc == null) {
|
||||
recordCacheMiss();
|
||||
return null;
|
||||
}
|
||||
byte[] endKey = loc.getRegion().getEndKey();
|
||||
|
@ -355,8 +365,10 @@ class AsyncNonMetaRegionLocator {
|
|||
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
|
||||
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
|
||||
}
|
||||
recordCacheHit();
|
||||
return locs;
|
||||
} else {
|
||||
recordCacheMiss();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -367,11 +379,13 @@ class AsyncNonMetaRegionLocator {
|
|||
Map.Entry<byte[], RegionLocations> entry =
|
||||
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
|
||||
if (entry == null) {
|
||||
recordCacheMiss();
|
||||
return null;
|
||||
}
|
||||
RegionLocations locs = entry.getValue();
|
||||
HRegionLocation loc = locs.getRegionLocation(replicaId);
|
||||
if (loc == null) {
|
||||
recordCacheMiss();
|
||||
return null;
|
||||
}
|
||||
if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
|
||||
|
@ -380,8 +394,10 @@ class AsyncNonMetaRegionLocator {
|
|||
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
|
||||
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
|
||||
}
|
||||
recordCacheHit();
|
||||
return locs;
|
||||
} else {
|
||||
recordCacheMiss();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -529,6 +545,10 @@ class AsyncNonMetaRegionLocator {
|
|||
return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
|
||||
}
|
||||
|
||||
private void recordClearRegionCache() {
|
||||
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
|
||||
}
|
||||
|
||||
private void removeLocationFromCache(HRegionLocation loc) {
|
||||
TableCache tableCache = cache.get(loc.getRegion().getTable());
|
||||
if (tableCache == null) {
|
||||
|
@ -544,10 +564,12 @@ class AsyncNonMetaRegionLocator {
|
|||
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
|
||||
if (newLocs == null) {
|
||||
if (tableCache.cache.remove(startKey, oldLocs)) {
|
||||
recordClearRegionCache();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
|
||||
recordClearRegionCache();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -569,7 +591,7 @@ class AsyncNonMetaRegionLocator {
|
|||
|
||||
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
|
||||
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
|
||||
this::addLocationToCache, this::removeLocationFromCache);
|
||||
this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
|
||||
}
|
||||
|
||||
void clearCache(TableName tableName) {
|
||||
|
@ -583,6 +605,8 @@ class AsyncNonMetaRegionLocator {
|
|||
tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
|
||||
}
|
||||
}
|
||||
conn.getConnectionMetrics()
|
||||
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
|
||||
}
|
||||
|
||||
void clearCache() {
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept
|
|||
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
|
@ -51,7 +52,8 @@ final class AsyncRegionLocatorHelper {
|
|||
|
||||
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
|
||||
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
|
||||
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
|
||||
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
|
||||
Optional<MetricsConnection> metrics) {
|
||||
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
|
||||
|
@ -78,6 +80,7 @@ final class AsyncRegionLocatorHelper {
|
|||
addToCache.accept(newLoc);
|
||||
} else {
|
||||
LOG.debug("Try removing {} from cache", loc);
|
||||
metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
|
||||
removeFromCache.accept(loc);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -188,10 +188,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
// thread executor shared by all Table instances created
|
||||
// by this connection
|
||||
private volatile ExecutorService batchPool = null;
|
||||
private volatile ThreadPoolExecutor batchPool = null;
|
||||
// meta thread executor shared by all Table instances created
|
||||
// by this connection
|
||||
private volatile ExecutorService metaLookupPool = null;
|
||||
private volatile ThreadPoolExecutor metaLookupPool = null;
|
||||
private volatile boolean cleanupPool = false;
|
||||
|
||||
private final Configuration conf;
|
||||
|
@ -236,14 +236,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
* constructor
|
||||
* @param conf Configuration object
|
||||
*/
|
||||
ConnectionImplementation(Configuration conf,
|
||||
ExecutorService pool, User user) throws IOException {
|
||||
ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
|
||||
this.conf = conf;
|
||||
this.user = user;
|
||||
if (user != null && user.isLoginFromKeytab()) {
|
||||
spawnRenewalChore(user.getUGI());
|
||||
}
|
||||
this.batchPool = pool;
|
||||
this.batchPool = (ThreadPoolExecutor) pool;
|
||||
this.connectionConfig = new ConnectionConfiguration(conf);
|
||||
this.closed = false;
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
|
@ -284,7 +283,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
|
||||
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
|
||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||
this.metrics = new MetricsConnection(this);
|
||||
this.metrics =
|
||||
new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
|
||||
} else {
|
||||
this.metrics = null;
|
||||
}
|
||||
|
@ -459,7 +459,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return this.metrics;
|
||||
}
|
||||
|
||||
private ExecutorService getBatchPool() {
|
||||
private ThreadPoolExecutor getBatchPool() {
|
||||
if (batchPool == null) {
|
||||
synchronized (this) {
|
||||
if (batchPool == null) {
|
||||
|
@ -472,7 +472,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return this.batchPool;
|
||||
}
|
||||
|
||||
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
|
||||
private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint,
|
||||
BlockingQueue<Runnable> passedWorkQueue) {
|
||||
// shared HTable thread executor not yet initialized
|
||||
if (maxThreads == 0) {
|
||||
|
@ -501,7 +501,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return tpe;
|
||||
}
|
||||
|
||||
private ExecutorService getMetaLookupPool() {
|
||||
private ThreadPoolExecutor getMetaLookupPool() {
|
||||
if (this.metaLookupPool == null) {
|
||||
synchronized (this) {
|
||||
if (this.metaLookupPool == null) {
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.net.InetAddress;
|
|||
import java.net.UnknownHostException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
@ -500,13 +501,19 @@ public final class ConnectionUtils {
|
|||
/**
|
||||
* Connect the two futures, if the src future is done, then mark the dst future as done. And if
|
||||
* the dst future is done, then cancel the src future. This is used for timeline consistent read.
|
||||
* <p/>
|
||||
* Pass empty metrics if you want to link the primary future and the dst future so we will not
|
||||
* increase the hedge read related metrics.
|
||||
*/
|
||||
private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
|
||||
private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture,
|
||||
Optional<MetricsConnection> metrics) {
|
||||
addListener(srcFuture, (r, e) -> {
|
||||
if (e != null) {
|
||||
dstFuture.completeExceptionally(e);
|
||||
} else {
|
||||
dstFuture.complete(r);
|
||||
if (dstFuture.complete(r)) {
|
||||
metrics.ifPresent(MetricsConnection::incrHedgedReadWin);
|
||||
}
|
||||
}
|
||||
});
|
||||
// The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
|
||||
|
@ -519,7 +526,7 @@ public final class ConnectionUtils {
|
|||
|
||||
private static <T> void sendRequestsToSecondaryReplicas(
|
||||
Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs,
|
||||
CompletableFuture<T> future) {
|
||||
CompletableFuture<T> future, Optional<MetricsConnection> metrics) {
|
||||
if (future.isDone()) {
|
||||
// do not send requests to secondary replicas if the future is done, i.e, the primary request
|
||||
// has already been finished.
|
||||
|
@ -527,14 +534,15 @@ public final class ConnectionUtils {
|
|||
}
|
||||
for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
|
||||
CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
|
||||
connect(secondaryFuture, future);
|
||||
metrics.ifPresent(MetricsConnection::incrHedgedReadOps);
|
||||
connect(secondaryFuture, future, metrics);
|
||||
}
|
||||
}
|
||||
|
||||
static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator,
|
||||
TableName tableName, Query query, byte[] row, RegionLocateType locateType,
|
||||
Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs,
|
||||
long primaryCallTimeoutNs, Timer retryTimer) {
|
||||
long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) {
|
||||
if (query.getConsistency() == Consistency.STRONG) {
|
||||
return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
|
||||
}
|
||||
|
@ -545,7 +553,7 @@ public final class ConnectionUtils {
|
|||
// Timeline consistent read, where we may send requests to other region replicas
|
||||
CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
connect(primaryFuture, future);
|
||||
connect(primaryFuture, future, Optional.empty());
|
||||
long startNs = System.nanoTime();
|
||||
// after the getRegionLocations, all the locations for the replicas of this region should have
|
||||
// been cached, so it is not big deal to locate them again when actually sending requests to
|
||||
|
@ -567,11 +575,11 @@ public final class ConnectionUtils {
|
|||
}
|
||||
long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
|
||||
if (delayNs <= 0) {
|
||||
sendRequestsToSecondaryReplicas(requestReplica, locs, future);
|
||||
sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics);
|
||||
} else {
|
||||
retryTimer.newTimeout(
|
||||
timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs,
|
||||
TimeUnit.NANOSECONDS);
|
||||
timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics),
|
||||
delayNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
|
@ -305,30 +305,30 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
|
||||
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
|
||||
MetricsConnection(final ConnectionImplementation conn) {
|
||||
this.scope = conn.toString();
|
||||
MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
|
||||
Supplier<ThreadPoolExecutor> metaPool) {
|
||||
this.scope = scope;
|
||||
this.registry = new MetricRegistry();
|
||||
|
||||
this.registry.register(getExecutorPoolName(),
|
||||
new RatioGauge() {
|
||||
@Override
|
||||
protected Ratio getRatio() {
|
||||
ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
|
||||
if (batchPool == null) {
|
||||
ThreadPoolExecutor pool = batchPool.get();
|
||||
if (pool == null) {
|
||||
return Ratio.of(0, 0);
|
||||
}
|
||||
return Ratio.of(batchPool.getActiveCount(), batchPool.getMaximumPoolSize());
|
||||
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
|
||||
}
|
||||
});
|
||||
this.registry.register(getMetaPoolName(),
|
||||
new RatioGauge() {
|
||||
@Override
|
||||
protected Ratio getRatio() {
|
||||
ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
|
||||
if (metaPool == null) {
|
||||
ThreadPoolExecutor pool = metaPool.get();
|
||||
if (pool == null) {
|
||||
return Ratio.of(0, 0);
|
||||
}
|
||||
return Ratio.of(metaPool.getActiveCount(), metaPool.getMaximumPoolSize());
|
||||
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
|
||||
}
|
||||
});
|
||||
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
|
||||
|
@ -401,6 +401,11 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
metaCacheNumClearRegion.inc();
|
||||
}
|
||||
|
||||
/** Increment the number of meta cache drops requested for individual region. */
|
||||
public void incrMetaCacheNumClearRegion(int count) {
|
||||
metaCacheNumClearRegion.inc(count);
|
||||
}
|
||||
|
||||
/** Increment the number of hedged read that have occurred. */
|
||||
public void incrHedgedReadOps() {
|
||||
hedgedReadOps.inc();
|
||||
|
|
|
@ -232,7 +232,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
public CompletableFuture<Result> get(Get get) {
|
||||
return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
|
||||
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
|
||||
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer);
|
||||
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals;
|
|||
import com.codahale.metrics.RatioGauge;
|
||||
import com.codahale.metrics.RatioGauge.Ratio;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MetricsTests;
|
||||
|
@ -35,7 +34,6 @@ import org.junit.BeforeClass;
|
|||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||
|
||||
|
@ -57,13 +55,11 @@ public class TestMetricsConnection {
|
|||
HBaseClassTestRule.forClass(TestMetricsConnection.class);
|
||||
|
||||
private static MetricsConnection METRICS;
|
||||
private static final ExecutorService BATCH_POOL = Executors.newFixedThreadPool(2);
|
||||
private static final ThreadPoolExecutor BATCH_POOL =
|
||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
ConnectionImplementation mocked = Mockito.mock(ConnectionImplementation.class);
|
||||
Mockito.when(mocked.toString()).thenReturn("mocked-connection");
|
||||
Mockito.when(mocked.getCurrentBatchPool()).thenReturn(BATCH_POOL);
|
||||
METRICS = new MetricsConnection(mocked);
|
||||
METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () -> null);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
Loading…
Reference in New Issue