From a3d2a2df3a0837f39d586f5f2018fd630883fa10 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 17 Apr 2019 21:45:54 +0800 Subject: [PATCH] HBASE-22244 Make use of MetricsConnection in async client --- .../hbase/client/AsyncClientScanner.java | 7 +++-- .../hbase/client/AsyncConnectionImpl.java | 16 +++++++++++- .../hbase/client/AsyncMetaRegionLocator.java | 3 ++- .../client/AsyncNonMetaRegionLocator.java | 26 ++++++++++++++++++- .../client/AsyncRegionLocatorHelper.java | 5 +++- .../client/ConnectionImplementation.java | 18 ++++++------- .../hadoop/hbase/client/ConnectionUtils.java | 26 ++++++++++++------- .../hbase/client/MetricsConnection.java | 25 +++++++++++------- .../hbase/client/RawAsyncTableImpl.java | 2 +- .../hbase/client/TestMetricsConnection.java | 12 +++------ 10 files changed, 95 insertions(+), 45 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 6d4aefde2a4..d6cca48e9c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -197,10 +197,9 @@ class AsyncClientScanner { private void openScanner() { incRegionCountMetrics(scanMetrics); openScannerTries.set(1); - addListener( - timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), - getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer), - (resp, error) -> { + addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), + getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, + conn.getConnectionMetrics()), (resp, error) -> { if (error != null) { consumer.onError(error); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index c972d119d3d..f046e7a352e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; +import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -103,6 +105,8 @@ class AsyncConnectionImpl implements AsyncConnection { private volatile boolean closed = false; + private final Optional metrics; + public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, User user) { this.conf = conf; @@ -112,7 +116,12 @@ class AsyncConnectionImpl implements AsyncConnection { } this.connConf = new AsyncConnectionConfiguration(conf); this.registry = registry; - this.rpcClient = RpcClientFactory.createClient(conf, clusterId); + if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { + this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null)); + } else { + this.metrics = Optional.empty(); + } + this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null)); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.rpcTimeout = @@ -148,6 +157,7 @@ class AsyncConnectionImpl implements AsyncConnection { if (authService != null) { authService.shutdown(); } + metrics.ifPresent(MetricsConnection::shutdown); closed = true; } @@ -312,4 +322,8 @@ class AsyncConnectionImpl implements AsyncConnection { public void clearRegionLocationCache() { locator.clearCache(); } + + Optional getConnectionMetrics() { + return metrics; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index fa08795b17a..54bf9ff38f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HRegionLocation; @@ -106,7 +107,7 @@ class AsyncMetaRegionLocator { void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation, - this::addLocationToCache, this::removeLocationFromCache); + this::addLocationToCache, this::removeLocationFromCache, Optional.empty()); } void clearCache() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 7f25708df2e..bbb84d04f3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -338,15 +338,25 @@ class AsyncNonMetaRegionLocator { return true; } + private void recordCacheHit() { + conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit); + } + + private void recordCacheMiss() { + conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss); + } + private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row, int replicaId) { Map.Entry entry = tableCache.cache.floorEntry(row); if (entry == null) { + recordCacheMiss(); return null; } RegionLocations locs = entry.getValue(); HRegionLocation loc = locs.getRegionLocation(replicaId); if (loc == null) { + recordCacheMiss(); return null; } byte[] endKey = loc.getRegion().getEndKey(); @@ -355,8 +365,10 @@ class AsyncNonMetaRegionLocator { LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); } + recordCacheHit(); return locs; } else { + recordCacheMiss(); return null; } } @@ -367,11 +379,13 @@ class AsyncNonMetaRegionLocator { Map.Entry entry = isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); if (entry == null) { + recordCacheMiss(); return null; } RegionLocations locs = entry.getValue(); HRegionLocation loc = locs.getRegionLocation(replicaId); if (loc == null) { + recordCacheMiss(); return null; } if (isEmptyStopRow(loc.getRegion().getEndKey()) || @@ -380,8 +394,10 @@ class AsyncNonMetaRegionLocator { LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); } + recordCacheHit(); return locs; } else { + recordCacheMiss(); return null; } } @@ -529,6 +545,10 @@ class AsyncNonMetaRegionLocator { return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); } + private void recordClearRegionCache() { + conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion); + } + private void removeLocationFromCache(HRegionLocation loc) { TableCache tableCache = cache.get(loc.getRegion().getTable()); if (tableCache == null) { @@ -544,10 +564,12 @@ class AsyncNonMetaRegionLocator { RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); if (newLocs == null) { if (tableCache.cache.remove(startKey, oldLocs)) { + recordClearRegionCache(); return; } } else { if (tableCache.cache.replace(startKey, oldLocs, newLocs)) { + recordClearRegionCache(); return; } } @@ -569,7 +591,7 @@ class AsyncNonMetaRegionLocator { void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, - this::addLocationToCache, this::removeLocationFromCache); + this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics()); } void clearCache(TableName tableName) { @@ -583,6 +605,8 @@ class AsyncNonMetaRegionLocator { tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error)); } } + conn.getConnectionMetrics() + .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size())); } void clearCache() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java index 4dde1bbc89c..2836e4b11e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; import java.util.Arrays; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; import org.apache.commons.lang3.ObjectUtils; @@ -51,7 +52,8 @@ final class AsyncRegionLocatorHelper { static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception, Function cachedLocationSupplier, - Consumer addToCache, Consumer removeFromCache) { + Consumer addToCache, Consumer removeFromCache, + Optional metrics) { HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); if (LOG.isDebugEnabled()) { LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc, @@ -78,6 +80,7 @@ final class AsyncRegionLocatorHelper { addToCache.accept(newLoc); } else { LOG.debug("Try removing {} from cache", loc); + metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception)); removeFromCache.accept(loc); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index f5b0b033f24..2954e045e40 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -190,10 +190,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // thread executor shared by all Table instances created // by this connection - private volatile ExecutorService batchPool = null; + private volatile ThreadPoolExecutor batchPool = null; // meta thread executor shared by all Table instances created // by this connection - private volatile ExecutorService metaLookupPool = null; + private volatile ThreadPoolExecutor metaLookupPool = null; private volatile boolean cleanupPool = false; private final Configuration conf; @@ -238,14 +238,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { * constructor * @param conf Configuration object */ - ConnectionImplementation(Configuration conf, - ExecutorService pool, User user) throws IOException { + ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { this.conf = conf; this.user = user; if (user != null && user.isLoginFromKeytab()) { spawnRenewalChore(user.getUGI()); } - this.batchPool = pool; + this.batchPool = (ThreadPoolExecutor) pool; this.connectionConfig = new ConnectionConfiguration(conf); this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, @@ -286,7 +285,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { - this.metrics = new MetricsConnection(this); + this.metrics = + new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool); } else { this.metrics = null; } @@ -461,7 +461,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return this.metrics; } - private ExecutorService getBatchPool() { + private ThreadPoolExecutor getBatchPool() { if (batchPool == null) { synchronized (this) { if (batchPool == null) { @@ -474,7 +474,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return this.batchPool; } - private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint, + private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint, BlockingQueue passedWorkQueue) { // shared HTable thread executor not yet initialized if (maxThreads == 0) { @@ -503,7 +503,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return tpe; } - private ExecutorService getMetaLookupPool() { + private ThreadPoolExecutor getMetaLookupPool() { if (this.metaLookupPool == null) { synchronized (this) { if (this.metaLookupPool == null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 101dda055da..6b06a7f3e59 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -28,6 +28,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -500,13 +501,19 @@ public final class ConnectionUtils { /** * Connect the two futures, if the src future is done, then mark the dst future as done. And if * the dst future is done, then cancel the src future. This is used for timeline consistent read. + *

+ * Pass empty metrics if you want to link the primary future and the dst future so we will not + * increase the hedge read related metrics. */ - private static void connect(CompletableFuture srcFuture, CompletableFuture dstFuture) { + private static void connect(CompletableFuture srcFuture, CompletableFuture dstFuture, + Optional metrics) { addListener(srcFuture, (r, e) -> { if (e != null) { dstFuture.completeExceptionally(e); } else { - dstFuture.complete(r); + if (dstFuture.complete(r)) { + metrics.ifPresent(MetricsConnection::incrHedgedReadWin); + } } }); // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. @@ -519,7 +526,7 @@ public final class ConnectionUtils { private static void sendRequestsToSecondaryReplicas( Function> requestReplica, RegionLocations locs, - CompletableFuture future) { + CompletableFuture future, Optional metrics) { if (future.isDone()) { // do not send requests to secondary replicas if the future is done, i.e, the primary request // has already been finished. @@ -527,14 +534,15 @@ public final class ConnectionUtils { } for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { CompletableFuture secondaryFuture = requestReplica.apply(replicaId); - connect(secondaryFuture, future); + metrics.ifPresent(MetricsConnection::incrHedgedReadOps); + connect(secondaryFuture, future, metrics); } } static CompletableFuture timelineConsistentRead(AsyncRegionLocator locator, TableName tableName, Query query, byte[] row, RegionLocateType locateType, Function> requestReplica, long rpcTimeoutNs, - long primaryCallTimeoutNs, Timer retryTimer) { + long primaryCallTimeoutNs, Timer retryTimer, Optional metrics) { if (query.getConsistency() == Consistency.STRONG) { return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); } @@ -545,7 +553,7 @@ public final class ConnectionUtils { // Timeline consistent read, where we may send requests to other region replicas CompletableFuture primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); CompletableFuture future = new CompletableFuture<>(); - connect(primaryFuture, future); + connect(primaryFuture, future, Optional.empty()); long startNs = System.nanoTime(); // after the getRegionLocations, all the locations for the replicas of this region should have // been cached, so it is not big deal to locate them again when actually sending requests to @@ -567,11 +575,11 @@ public final class ConnectionUtils { } long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); if (delayNs <= 0) { - sendRequestsToSecondaryReplicas(requestReplica, locs, future); + sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics); } else { retryTimer.newTimeout( - timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs, - TimeUnit.NANOSECONDS); + timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics), + delayNs, TimeUnit.NANOSECONDS); } }); return future; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index ac1a02ab30c..c62a7121f12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -33,7 +33,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import java.util.function.Supplier; import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; @@ -305,30 +305,30 @@ public class MetricsConnection implements StatisticTrackable { private final ConcurrentMap cacheDroppingExceptions = new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); - MetricsConnection(final ConnectionImplementation conn) { - this.scope = conn.toString(); + MetricsConnection(String scope, Supplier batchPool, + Supplier metaPool) { + this.scope = scope; this.registry = new MetricRegistry(); - this.registry.register(getExecutorPoolName(), new RatioGauge() { @Override protected Ratio getRatio() { - ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); - if (batchPool == null) { + ThreadPoolExecutor pool = batchPool.get(); + if (pool == null) { return Ratio.of(0, 0); } - return Ratio.of(batchPool.getActiveCount(), batchPool.getMaximumPoolSize()); + return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); } }); this.registry.register(getMetaPoolName(), new RatioGauge() { @Override protected Ratio getRatio() { - ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); - if (metaPool == null) { + ThreadPoolExecutor pool = metaPool.get(); + if (pool == null) { return Ratio.of(0, 0); } - return Ratio.of(metaPool.getActiveCount(), metaPool.getMaximumPoolSize()); + return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); } }); this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); @@ -401,6 +401,11 @@ public class MetricsConnection implements StatisticTrackable { metaCacheNumClearRegion.inc(); } + /** Increment the number of meta cache drops requested for individual region. */ + public void incrMetaCacheNumClearRegion(int count) { + metaCacheNumClearRegion.inc(count); + } + /** Increment the number of hedged read that have occurred. */ public void incrHedgedReadOps() { hedgedReadOps.inc(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 86f11b9a1bd..688c86fe655 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -232,7 +232,7 @@ class RawAsyncTableImpl implements AsyncTable { public CompletableFuture get(Get get) { return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, - conn.connConf.getPrimaryCallTimeoutNs(), retryTimer); + conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java index 97a672db6fa..bfbaf97be21 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals; import com.codahale.metrics.RatioGauge; import com.codahale.metrics.RatioGauge.Ratio; import java.io.IOException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MetricsTests; @@ -35,7 +34,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -57,13 +55,11 @@ public class TestMetricsConnection { HBaseClassTestRule.forClass(TestMetricsConnection.class); private static MetricsConnection METRICS; - private static final ExecutorService BATCH_POOL = Executors.newFixedThreadPool(2); + private static final ThreadPoolExecutor BATCH_POOL = + (ThreadPoolExecutor) Executors.newFixedThreadPool(2); @BeforeClass public static void beforeClass() { - ConnectionImplementation mocked = Mockito.mock(ConnectionImplementation.class); - Mockito.when(mocked.toString()).thenReturn("mocked-connection"); - Mockito.when(mocked.getCurrentBatchPool()).thenReturn(BATCH_POOL); - METRICS = new MetricsConnection(mocked); + METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () -> null); } @AfterClass