From 65fb40d7eebcbedae6d16c805a53c441e24bf6a8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 21 Apr 2019 11:58:52 +0800 Subject: [PATCH] HBASE-22267 Implement client push back for async client --- .../client/AsyncBatchRpcRetryingCaller.java | 119 +++++++---- .../hbase/client/AsyncConnectionImpl.java | 15 ++ .../hbase/client/AsyncRequestFutureImpl.java | 42 +--- .../hadoop/hbase/client/ConnectionUtils.java | 23 ++ .../hbase/client/MetricsConnection.java | 8 +- .../hbase/client/RawAsyncTableImpl.java | 12 +- .../hbase/client/ServerStatisticTracker.java | 10 +- .../hadoop/hbase/client/TestAsyncProcess.java | 2 +- .../hbase/client/ClientPushbackTestBase.java | 188 ++++++++++++++++ .../hbase/client/TestAsyncClientPushback.java | 96 +++++++++ .../hbase/client/TestClientPushback.java | 201 +++++------------- 11 files changed, 472 insertions(+), 244 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index f9bcf74888f..e429422d1c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -134,6 +136,10 @@ class AsyncBatchRpcRetryingCaller { () -> new RegionRequest(loc)).actions.add(action); } + public void setRegionRequest(byte[] regionName, RegionRequest regionReq) { + actionsByRegion.put(regionName, regionReq); + } + public int getPriority() { return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); @@ -298,6 +304,8 @@ class AsyncBatchRpcRetryingCaller { private void onComplete(Map actionsByRegion, int tries, ServerName serverName, MultiResponse resp) { + ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), + serverName, resp); List failedActions = new ArrayList<>(); MutableBoolean retryImmediately = new MutableBoolean(false); actionsByRegion.forEach((rn, regionReq) -> { @@ -333,55 +341,88 @@ class AsyncBatchRpcRetryingCaller { } } - private void send(Map actionsByServer, int tries) { + private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) { long remainingNs; if (operationTimeoutNs > 0) { remainingNs = remainingTimeNs(); if (remainingNs <= 0) { - failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream()) - .flatMap(r -> r.actions.stream()), tries); + failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), + tries); return; } } else { remainingNs = Long.MAX_VALUE; } - actionsByServer.forEach((sn, serverReq) -> { - ClientService.Interface stub; - try { - stub = conn.getRegionServerStub(sn); - } catch (IOException e) { - onError(serverReq.actionsByRegion, tries, e, sn); - return; + ClientService.Interface stub; + try { + stub = conn.getRegionServerStub(serverName); + } catch (IOException e) { + onError(serverReq.actionsByRegion, tries, e, serverName); + return; + } + ClientProtos.MultiRequest req; + List cells = new ArrayList<>(); + // Map from a created RegionAction to the original index for a RowMutations within + // the original list of actions. This will be used to process the results when there + // is RowMutations in the action list. + Map rowMutationsIndexMap = new HashMap<>(); + try { + req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); + } catch (IOException e) { + onError(serverReq.actionsByRegion, tries, e, serverName); + return; + } + HBaseRpcController controller = conn.rpcControllerFactory.newController(); + resetController(controller, Math.min(rpcTimeoutNs, remainingNs), + calcPriority(serverReq.getPriority(), tableName)); + if (!cells.isEmpty()) { + controller.setCellScanner(createCellScanner(cells)); + } + stub.multi(controller, req, resp -> { + if (controller.failed()) { + onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName); + } else { + try { + onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req, + rowMutationsIndexMap, resp, controller.cellScanner())); + } catch (Exception e) { + onError(serverReq.actionsByRegion, tries, e, serverName); + return; + } } - ClientProtos.MultiRequest req; - List cells = new ArrayList<>(); - // Map from a created RegionAction to the original index for a RowMutations within - // the original list of actions. This will be used to process the results when there - // is RowMutations in the action list. - Map rowMutationsIndexMap = new HashMap<>(); - try { - req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); - } catch (IOException e) { - onError(serverReq.actionsByRegion, tries, e, sn); - return; - } - HBaseRpcController controller = conn.rpcControllerFactory.newController(); - resetController(controller, Math.min(rpcTimeoutNs, remainingNs), - calcPriority(serverReq.getPriority(), tableName)); - if (!cells.isEmpty()) { - controller.setCellScanner(createCellScanner(cells)); - } - stub.multi(controller, req, resp -> { - if (controller.failed()) { - onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn); + }); + } + + // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit, + // based on the load of the region server and the region. + private void sendOrDelay(Map actionsByServer, int tries) { + Optional metrics = conn.getConnectionMetrics(); + Optional optStats = conn.getStatisticsTracker(); + if (!optStats.isPresent()) { + actionsByServer.forEach((serverName, serverReq) -> { + metrics.ifPresent(MetricsConnection::incrNormalRunners); + sendToServer(serverName, serverReq, tries); + }); + return; + } + ServerStatisticTracker stats = optStats.get(); + ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy(); + actionsByServer.forEach((serverName, serverReq) -> { + ServerStatistics serverStats = stats.getStats(serverName); + Map groupByBackoff = new HashMap<>(); + serverReq.actionsByRegion.forEach((regionName, regionReq) -> { + long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats); + groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest()) + .setRegionRequest(regionName, regionReq); + }); + groupByBackoff.forEach((backoff, sr) -> { + if (backoff > 0) { + metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff)); + retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff, + TimeUnit.MILLISECONDS); } else { - try { - onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req, - rowMutationsIndexMap, resp, controller.cellScanner())); - } catch (Exception e) { - onError(serverReq.actionsByRegion, tries, e, sn); - return; - } + metrics.ifPresent(MetricsConnection::incrNormalRunners); + sendToServer(serverName, sr, tries); } }); }); @@ -454,7 +495,7 @@ class AsyncBatchRpcRetryingCaller { })) .toArray(CompletableFuture[]::new)), (v, r) -> { if (!actionsByServer.isEmpty()) { - send(actionsByServer, tries); + sendOrDelay(actionsByServer, tries); } if (!locateFailed.isEmpty()) { tryResubmit(locateFailed.stream(), tries, false); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index f046e7a352e..7d59984540e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -101,6 +103,9 @@ class AsyncConnectionImpl implements AsyncConnection { private final AtomicReference> masterStubMakeFuture = new AtomicReference<>(); + private final Optional stats; + private final ClientBackoffPolicy backoffPolicy; + private ChoreService authService; private volatile boolean closed = false; @@ -133,6 +138,8 @@ class AsyncConnectionImpl implements AsyncConnection { } else { nonceGenerator = NO_NONCE_GENERATOR; } + this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf)); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); } private void spawnRenewalChore(final UserGroupInformation user) { @@ -233,6 +240,14 @@ class AsyncConnectionImpl implements AsyncConnection { masterStub.compareAndSet(stub, null); } + Optional getStatisticsTracker() { + return stats; + } + + ClientBackoffPolicy getBackoffPolicy() { + return backoffPolicy; + } + @Override public AsyncTableBuilder getTableBuilder(TableName tableName) { return new AsyncTableBuilderBase(tableName, connConf) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 525033d4c93..e46a50e7619 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -28,6 +28,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -55,9 +56,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; - /** * The context, and return value, for a single submit/submitAll call. * Note on how this class (one AP submit) works. Initially, all requests are split into groups @@ -614,8 +612,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { traceText = "AsyncProcess.clientBackoff.sendMultiAction"; runnable = runner; if (asyncProcess.connection.getConnectionMetrics() != null) { - asyncProcess.connection.getConnectionMetrics().incrDelayRunners(); - asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime()); + asyncProcess.connection.getConnectionMetrics() + .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime()); } } else { if (asyncProcess.connection.getConnectionMetrics() != null) { @@ -802,19 +800,16 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * @param responses - the response, if any * @param numAttempt - the attempt */ - private void receiveMultiAction(MultiAction multiAction, - ServerName server, MultiResponse responses, int numAttempt) { + private void receiveMultiAction(MultiAction multiAction, ServerName server, + MultiResponse responses, int numAttempt) { assert responses != null; - - Map results = responses.getResults(); - updateStats(server, results); - + updateStats(server, responses); // Success or partial success // Analyze detailed results. We can still have individual failures to be redo. // two specific throwables are managed: // - DoNotRetryIOException: we continue to retry for other actions // - RegionMovedException: we update the cache with the new region location - + Map results = responses.getResults(); List toReplay = new ArrayList<>(); Throwable lastException = null; int failureCount = 0; @@ -926,26 +921,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } @VisibleForTesting - protected void updateStats(ServerName server, Map results) { - boolean metrics = asyncProcess.connection.getConnectionMetrics() != null; - boolean stats = asyncProcess.connection.getStatisticsTracker() != null; - if (!stats && !metrics) { - return; - } - for (Map.Entry regionStats : results.entrySet()) { - byte[] regionName = regionStats.getKey(); - ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat(); - if (stat == null) { - LOG.error("No ClientProtos.RegionLoadStats found for server=" + server - + ", region=" + Bytes.toStringBinary(regionName)); - continue; - } - RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat); - ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server, - regionName, regionLoadstats); - ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(), - server, regionName, regionLoadstats); - } + protected void updateStats(ServerName server, MultiResponse resp) { + ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()), + Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 6b06a7f3e59..4a2fa3a9b55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -62,8 +62,10 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.io.netty.util.Timer; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; @@ -672,4 +674,25 @@ public final class ConnectionUtils { } } } + + static void updateStats(Optional optStats, + Optional optMetrics, ServerName serverName, MultiResponse resp) { + if (!optStats.isPresent() && !optMetrics.isPresent()) { + // ServerStatisticTracker and MetricsConnection are both not present, just return + return; + } + resp.getResults().forEach((regionName, regionResult) -> { + ClientProtos.RegionLoadStats stat = regionResult.getStat(); + if (stat == null) { + LOG.error("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName, + Bytes.toStringBinary(regionName)); + return; + } + RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat); + optStats.ifPresent( + stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats)); + optMetrics.ifPresent( + metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats)); + }); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index c62a7121f12..d842f907937 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -421,13 +421,9 @@ public class MetricsConnection implements StatisticTrackable { this.runnerStats.incrNormalRunners(); } - /** Increment the number of delay runner counts. */ - public void incrDelayRunners() { + /** Increment the number of delay runner counts and update delay interval of delay runner. */ + public void incrDelayRunnersAndUpdateDelayInterval(long interval) { this.runnerStats.incrDelayRunners(); - } - - /** Update delay interval of delay runner. */ - public void updateDelayInterval(long interval) { this.runnerStats.updateDelayInterval(interval); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 688c86fe655..6b876536382 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -357,8 +357,8 @@ class RawAsyncTableImpl implements AsyncTable { preCheck(); return RawAsyncTableImpl.this . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, - stub, mutation, + .action((controller, loc, stub) -> RawAsyncTableImpl.this. mutateRow(controller, + loc, stub, mutation, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), resp -> resp.getExists())) @@ -373,7 +373,7 @@ class RawAsyncTableImpl implements AsyncTable { // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // so here I write a new method as I do not want to change the abstraction of call method. - private static CompletableFuture mutateRow(HBaseRpcController controller, + private CompletableFuture mutateRow(HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, Converter reqConvert, Function respConverter) { @@ -391,6 +391,8 @@ class RawAsyncTableImpl implements AsyncTable { try { org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter.getResults(req, resp, controller.cellScanner()); + ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), + loc.getServerName(), multiResp); Throwable ex = multiResp.getException(regionName); if (ex != null) { future.completeExceptionally(ex instanceof IOException ? ex @@ -415,8 +417,8 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture mutateRow(RowMutations mutation) { return this. newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, - mutation, (rn, rm) -> { + .action((controller, loc, stub) -> this. mutateRow(controller, loc, stub, mutation, + (rn, rm) -> { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java index c5c7375cc3c..12e3e3bd990 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -19,15 +19,12 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import java.util.concurrent.ConcurrentHashMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.yetus.audience.InterfaceAudience; /** * Tracks the statistics for multiple regions @@ -53,9 +50,4 @@ public class ServerStatisticTracker implements StatisticTrackable { } return new ServerStatisticTracker(); } - - @VisibleForTesting - ServerStatistics getServerStatsForTesting(ServerName server) { - return stats.get(server); - } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 71b21ac0d48..81dcc46f484 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -271,7 +271,7 @@ public class TestAsyncProcess { } @Override - protected void updateStats(ServerName server, Map results) { + protected void updateStats(ServerName server, MultiResponse resp) { // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java new file mode 100644 index 00000000000..a7202b857e6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that we can actually send and use region metrics to slowdown client writes + */ +public abstract class ClientPushbackTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(ClientPushbackTestBase.class); + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static final TableName tableName = TableName.valueOf("client-pushback"); + private static final byte[] family = Bytes.toBytes("f"); + private static final byte[] qualifier = Bytes.toBytes("q"); + private static final long flushSizeBytes = 512; + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = UTIL.getConfiguration(); + // enable backpressure + conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); + // use the exponential backoff policy + conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class, + ClientBackoffPolicy.class); + // turn the memstore size way down so we don't need to write a lot to see changes in memstore + // load + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes); + // ensure we block the flushes when we are double that flushsize + conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); + conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true); + UTIL.startMiniCluster(1); + UTIL.createTable(tableName, family); + } + + @AfterClass + public static void cleanupCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + protected abstract ClientBackoffPolicy getBackoffPolicy() throws IOException; + + protected abstract ServerStatisticTracker getStatisticsTracker() throws IOException; + + protected abstract MetricsConnection getConnectionMetrics() throws IOException; + + protected abstract void mutate(Put put) throws IOException; + + protected abstract void mutate(Put put, AtomicLong endTime, CountDownLatch latch) + throws IOException; + + protected abstract void mutateRow(RowMutations mutations) throws IOException; + + @Test + public void testClientTracksServerPushback() throws Exception { + HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); + Region region = rs.getRegions(tableName).get(0); + + LOG.debug("Writing some data to " + tableName); + // write some data + Put p = new Put(Bytes.toBytes("row")); + p.addColumn(family, qualifier, Bytes.toBytes("value1")); + mutate(p); + + // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data + int load = (int) ((region.getMemStoreHeapSize() * 100) / flushSizeBytes); + LOG.debug("Done writing some data to " + tableName); + + // get the stats for the region hosting our table + ClientBackoffPolicy backoffPolicy = getBackoffPolicy(); + assertTrue("Backoff policy is not correctly configured", + backoffPolicy instanceof ExponentialClientBackoffPolicy); + + ServerStatisticTracker stats = getStatisticsTracker(); + assertNotNull("No stats configured for the client!", stats); + // get the names so we can query the stats + ServerName server = rs.getServerName(); + byte[] regionName = region.getRegionInfo().getRegionName(); + + // check to see we found some load on the memstore + ServerStatistics serverStats = stats.getStats(server); + ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); + assertEquals("We did not find some load on the memstore", load, + regionStats.getMemStoreLoadPercent()); + // check that the load reported produces a nonzero delay + long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats); + assertNotEquals("Reported load does not produce a backoff", 0, backoffTime); + LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " + + server + " is " + backoffTime); + + CountDownLatch latch = new CountDownLatch(1); + AtomicLong endTime = new AtomicLong(); + long startTime = EnvironmentEdgeManager.currentTime(); + mutate(p, endTime, latch); + // Currently the ExponentialClientBackoffPolicy under these test conditions + // produces a backoffTime of 151 milliseconds. This is long enough so the + // wait and related checks below are reasonable. Revisit if the backoff + // time reported by above debug logging has significantly deviated. + MetricsConnection metrics = getConnectionMetrics(); + String name = server.getServerName() + "," + Bytes.toStringBinary(regionName); + MetricsConnection.RegionStats rsStats = metrics.serverStats.get(server).get(regionName); + assertEquals(name, rsStats.name); + assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(), + (double) regionStats.getHeapOccupancyPercent(), 0.1); + assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(), + (double) regionStats.getMemStoreLoadPercent(), 0.1); + + MetricsConnection.RunnerStats runnerStats = metrics.runnerStats; + + assertEquals(1, runnerStats.delayRunners.getCount()); + assertEquals(1, runnerStats.normalRunners.getCount()); + assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(), (double) backoffTime, + 0.1); + + latch.await(backoffTime * 2, TimeUnit.MILLISECONDS); + assertNotEquals("AsyncProcess did not submit the work time", 0, endTime.get()); + assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime); + } + + @Test + public void testMutateRowStats() throws IOException { + HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); + Region region = rs.getRegions(tableName).get(0); + + RowMutations mutations = new RowMutations(Bytes.toBytes("row")); + Put p = new Put(Bytes.toBytes("row")); + p.addColumn(family, qualifier, Bytes.toBytes("value2")); + mutations.add(p); + mutateRow(mutations); + + ServerStatisticTracker stats = getStatisticsTracker(); + assertNotNull("No stats configured for the client!", stats); + // get the names so we can query the stats + ServerName server = rs.getServerName(); + byte[] regionName = region.getRegionInfo().getRegionName(); + + // check to see we found some load on the memstore + ServerStatistics serverStats = stats.getStats(server); + ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); + + assertNotNull(regionStats); + assertTrue(regionStats.getMemStoreLoadPercent() > 0); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java new file mode 100644 index 00000000000..cc030d8606e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncClientPushback extends ClientPushbackTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncClientPushback.class); + + private AsyncConnectionImpl conn; + + private AsyncBufferedMutator mutator; + + @Before + public void setUp() throws Exception { + conn = + (AsyncConnectionImpl) ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + mutator = conn.getBufferedMutator(tableName); + } + + @After + public void tearDown() throws IOException { + Closeables.close(mutator, true); + Closeables.close(conn, true); + } + + @Override + protected ClientBackoffPolicy getBackoffPolicy() throws IOException { + return conn.getBackoffPolicy(); + } + + @Override + protected ServerStatisticTracker getStatisticsTracker() throws IOException { + return conn.getStatisticsTracker().get(); + } + + @Override + protected MetricsConnection getConnectionMetrics() throws IOException { + return conn.getConnectionMetrics().get(); + } + + @Override + protected void mutate(Put put) throws IOException { + CompletableFuture future = mutator.mutate(put); + mutator.flush(); + future.join(); + } + + @Override + protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) throws IOException { + FutureUtils.addListener(mutator.mutate(put), (r, e) -> { + endTime.set(EnvironmentEdgeManager.currentTime()); + latch.countDown(); + }); + mutator.flush(); + } + + @Override + protected void mutateRow(RowMutations mutations) throws IOException { + conn.getTable(tableName).mutateRow(mutations).join(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index d6f32f5483f..e789349aae0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -17,193 +17,90 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; -import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; -import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** - * Test that we can actually send and use region metrics to slowdown client writes - */ -@Category(MediumTests.class) -public class TestClientPushback { +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestClientPushback extends ClientPushbackTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestClientPushback.class); + HBaseClassTestRule.forClass(TestClientPushback.class); - private static final Logger LOG = LoggerFactory.getLogger(TestClientPushback.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private ConnectionImplementation conn; - private static final TableName tableName = TableName.valueOf("client-pushback"); - private static final byte[] family = Bytes.toBytes("f"); - private static final byte[] qualifier = Bytes.toBytes("q"); - private static final long flushSizeBytes = 512; + private BufferedMutatorImpl mutator; - @BeforeClass - public static void setupCluster() throws Exception{ - Configuration conf = UTIL.getConfiguration(); - // enable backpressure - conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); - // use the exponential backoff policy - conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class, - ClientBackoffPolicy.class); - // turn the memstore size way down so we don't need to write a lot to see changes in memstore - // load - conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes); - // ensure we block the flushes when we are double that flushsize - conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); - conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true); - UTIL.startMiniCluster(1); - UTIL.createTable(tableName, family); + @Before + public void setUp() throws IOException { + conn = (ConnectionImplementation) ConnectionFactory.createConnection(UTIL.getConfiguration()); + mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName); } - @AfterClass - public static void teardownCluster() throws Exception{ - UTIL.shutdownMiniCluster(); + @After + public void tearDown() throws IOException { + Closeables.close(mutator, true); + Closeables.close(conn, true); } - @Test - public void testClientTracksServerPushback() throws Exception{ - Configuration conf = UTIL.getConfiguration(); + @Override + protected ClientBackoffPolicy getBackoffPolicy() throws IOException { + return conn.getBackoffPolicy(); + } - ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName); + @Override + protected ServerStatisticTracker getStatisticsTracker() throws IOException { + return conn.getStatisticsTracker(); + } - HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); - Region region = rs.getRegions(tableName).get(0); + @Override + protected MetricsConnection getConnectionMetrics() throws IOException { + return conn.getConnectionMetrics(); + } - LOG.debug("Writing some data to "+tableName); - // write some data - Put p = new Put(Bytes.toBytes("row")); - p.addColumn(family, qualifier, Bytes.toBytes("value1")); - mutator.mutate(p); + @Override + protected void mutate(Put put) throws IOException { + mutator.mutate(put); mutator.flush(); + } - // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data - int load = (int) ((region.getMemStoreHeapSize() * 100) - / flushSizeBytes); - LOG.debug("Done writing some data to "+tableName); - - // get the stats for the region hosting our table - ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy(); - assertTrue("Backoff policy is not correctly configured", - backoffPolicy instanceof ExponentialClientBackoffPolicy); - - ServerStatisticTracker stats = conn.getStatisticsTracker(); - assertNotNull( "No stats configured for the client!", stats); - // get the names so we can query the stats - ServerName server = rs.getServerName(); - byte[] regionName = region.getRegionInfo().getRegionName(); - - // check to see we found some load on the memstore - ServerStatistics serverStats = stats.getServerStatsForTesting(server); - ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); - assertEquals("We did not find some load on the memstore", load, - regionStats.getMemStoreLoadPercent()); - // check that the load reported produces a nonzero delay - long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats); - assertNotEquals("Reported load does not produce a backoff", 0, backoffTime); - LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " + - server + " is " + backoffTime); - + @Override + protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) throws IOException { // Reach into the connection and submit work directly to AsyncProcess so we can // monitor how long the submission was delayed via a callback List ops = new ArrayList<>(1); - ops.add(p); - final CountDownLatch latch = new CountDownLatch(1); - final AtomicLong endTime = new AtomicLong(); - long startTime = EnvironmentEdgeManager.currentTime(); + ops.add(put); Batch.Callback callback = (byte[] r, byte[] row, Result result) -> { - endTime.set(EnvironmentEdgeManager.currentTime()); - latch.countDown(); + endTime.set(EnvironmentEdgeManager.currentTime()); + latch.countDown(); }; - AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) - .setPool(mutator.getPool()) - .setTableName(tableName) - .setRowAccess(ops) - .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) - .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout()) - .setRpcTimeout(60 * 1000) - .build(); + AsyncProcessTask task = + AsyncProcessTask.newBuilder(callback).setPool(mutator.getPool()).setTableName(tableName) + .setRowAccess(ops).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) + .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout()) + .setRpcTimeout(60 * 1000).build(); mutator.getAsyncProcess().submit(task); - // Currently the ExponentialClientBackoffPolicy under these test conditions - // produces a backoffTime of 151 milliseconds. This is long enough so the - // wait and related checks below are reasonable. Revisit if the backoff - // time reported by above debug logging has significantly deviated. - String name = server.getServerName() + "," + Bytes.toStringBinary(regionName); - MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics(). - serverStats.get(server).get(regionName); - assertEquals(name, rsStats.name); - assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(), - (double)regionStats.getHeapOccupancyPercent(), 0.1 ); - assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(), - (double)regionStats.getMemStoreLoadPercent(), 0.1); - - MetricsConnection.RunnerStats runnerStats = conn.getConnectionMetrics().runnerStats; - - assertEquals(1, runnerStats.delayRunners.getCount()); - assertEquals(1, runnerStats.normalRunners.getCount()); - assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(), - (double)backoffTime, 0.1); - - latch.await(backoffTime * 2, TimeUnit.MILLISECONDS); - assertNotEquals("AsyncProcess did not submit the work time", 0, endTime.get()); - assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime); } - @Test - public void testMutateRowStats() throws IOException { - Configuration conf = UTIL.getConfiguration(); - ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); - Region region = rs.getRegions(tableName).get(0); - - RowMutations mutations = new RowMutations(Bytes.toBytes("row")); - Put p = new Put(Bytes.toBytes("row")); - p.addColumn(family, qualifier, Bytes.toBytes("value2")); - mutations.add(p); - table.mutateRow(mutations); - - ServerStatisticTracker stats = conn.getStatisticsTracker(); - assertNotNull( "No stats configured for the client!", stats); - // get the names so we can query the stats - ServerName server = rs.getServerName(); - byte[] regionName = region.getRegionInfo().getRegionName(); - - // check to see we found some load on the memstore - ServerStatistics serverStats = stats.getServerStatsForTesting(server); - ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); - - assertNotNull(regionStats); - assertTrue(regionStats.getMemStoreLoadPercent() > 0); + @Override + protected void mutateRow(RowMutations mutations) throws IOException { + try (Table table = conn.getTable(tableName)) { + table.mutateRow(mutations); } + } }