HBASE-22267 Implement client push back for async client

This commit is contained in:
zhangduo 2019-04-21 11:58:52 +08:00
parent 542d1fcbc4
commit 65fb40d7ee
11 changed files with 472 additions and 244 deletions

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -134,6 +136,10 @@ class AsyncBatchRpcRetryingCaller<T> {
() -> new RegionRequest(loc)).actions.add(action); () -> new RegionRequest(loc)).actions.add(action);
} }
public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
actionsByRegion.put(regionName, regionReq);
}
public int getPriority() { public int getPriority() {
return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
.mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
@ -298,6 +304,8 @@ class AsyncBatchRpcRetryingCaller<T> {
private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries, private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
ServerName serverName, MultiResponse resp) { ServerName serverName, MultiResponse resp) {
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
serverName, resp);
List<Action> failedActions = new ArrayList<>(); List<Action> failedActions = new ArrayList<>();
MutableBoolean retryImmediately = new MutableBoolean(false); MutableBoolean retryImmediately = new MutableBoolean(false);
actionsByRegion.forEach((rn, regionReq) -> { actionsByRegion.forEach((rn, regionReq) -> {
@ -333,55 +341,88 @@ class AsyncBatchRpcRetryingCaller<T> {
} }
} }
private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) { private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
long remainingNs; long remainingNs;
if (operationTimeoutNs > 0) { if (operationTimeoutNs > 0) {
remainingNs = remainingTimeNs(); remainingNs = remainingTimeNs();
if (remainingNs <= 0) { if (remainingNs <= 0) {
failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream()) failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
.flatMap(r -> r.actions.stream()), tries); tries);
return; return;
} }
} else { } else {
remainingNs = Long.MAX_VALUE; remainingNs = Long.MAX_VALUE;
} }
actionsByServer.forEach((sn, serverReq) -> { ClientService.Interface stub;
ClientService.Interface stub; try {
try { stub = conn.getRegionServerStub(serverName);
stub = conn.getRegionServerStub(sn); } catch (IOException e) {
} catch (IOException e) { onError(serverReq.actionsByRegion, tries, e, serverName);
onError(serverReq.actionsByRegion, tries, e, sn); return;
return; }
ClientProtos.MultiRequest req;
List<CellScannable> cells = new ArrayList<>();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
try {
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName));
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
}
stub.multi(controller, req, resp -> {
if (controller.failed()) {
onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
} else {
try {
onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
rowMutationsIndexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
}
} }
ClientProtos.MultiRequest req; });
List<CellScannable> cells = new ArrayList<>(); }
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
// is RowMutations in the action list. // based on the load of the region server and the region.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>(); private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
try { Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
} catch (IOException e) { if (!optStats.isPresent()) {
onError(serverReq.actionsByRegion, tries, e, sn); actionsByServer.forEach((serverName, serverReq) -> {
return; metrics.ifPresent(MetricsConnection::incrNormalRunners);
} sendToServer(serverName, serverReq, tries);
HBaseRpcController controller = conn.rpcControllerFactory.newController(); });
resetController(controller, Math.min(rpcTimeoutNs, remainingNs), return;
calcPriority(serverReq.getPriority(), tableName)); }
if (!cells.isEmpty()) { ServerStatisticTracker stats = optStats.get();
controller.setCellScanner(createCellScanner(cells)); ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
} actionsByServer.forEach((serverName, serverReq) -> {
stub.multi(controller, req, resp -> { ServerStatistics serverStats = stats.getStats(serverName);
if (controller.failed()) { Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn); serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
.setRegionRequest(regionName, regionReq);
});
groupByBackoff.forEach((backoff, sr) -> {
if (backoff > 0) {
metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
TimeUnit.MILLISECONDS);
} else { } else {
try { metrics.ifPresent(MetricsConnection::incrNormalRunners);
onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req, sendToServer(serverName, sr, tries);
rowMutationsIndexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;
}
} }
}); });
}); });
@ -454,7 +495,7 @@ class AsyncBatchRpcRetryingCaller<T> {
})) }))
.toArray(CompletableFuture[]::new)), (v, r) -> { .toArray(CompletableFuture[]::new)), (v, r) -> {
if (!actionsByServer.isEmpty()) { if (!actionsByServer.isEmpty()) {
send(actionsByServer, tries); sendOrDelay(actionsByServer, tries);
} }
if (!locateFailed.isEmpty()) { if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries, false); tryResubmit(locateFailed.stream(), tries, false);

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -101,6 +103,9 @@ class AsyncConnectionImpl implements AsyncConnection {
private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
new AtomicReference<>(); new AtomicReference<>();
private final Optional<ServerStatisticTracker> stats;
private final ClientBackoffPolicy backoffPolicy;
private ChoreService authService; private ChoreService authService;
private volatile boolean closed = false; private volatile boolean closed = false;
@ -133,6 +138,8 @@ class AsyncConnectionImpl implements AsyncConnection {
} else { } else {
nonceGenerator = NO_NONCE_GENERATOR; nonceGenerator = NO_NONCE_GENERATOR;
} }
this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
} }
private void spawnRenewalChore(final UserGroupInformation user) { private void spawnRenewalChore(final UserGroupInformation user) {
@ -233,6 +240,14 @@ class AsyncConnectionImpl implements AsyncConnection {
masterStub.compareAndSet(stub, null); masterStub.compareAndSet(stub, null);
} }
Optional<ServerStatisticTracker> getStatisticsTracker() {
return stats;
}
ClientBackoffPolicy getBackoffPolicy() {
return backoffPolicy;
}
@Override @Override
public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {

View File

@ -28,6 +28,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -55,9 +56,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/** /**
* The context, and return value, for a single submit/submitAll call. * The context, and return value, for a single submit/submitAll call.
* Note on how this class (one AP submit) works. Initially, all requests are split into groups * Note on how this class (one AP submit) works. Initially, all requests are split into groups
@ -614,8 +612,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
traceText = "AsyncProcess.clientBackoff.sendMultiAction"; traceText = "AsyncProcess.clientBackoff.sendMultiAction";
runnable = runner; runnable = runner;
if (asyncProcess.connection.getConnectionMetrics() != null) { if (asyncProcess.connection.getConnectionMetrics() != null) {
asyncProcess.connection.getConnectionMetrics().incrDelayRunners(); asyncProcess.connection.getConnectionMetrics()
asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime()); .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
} }
} else { } else {
if (asyncProcess.connection.getConnectionMetrics() != null) { if (asyncProcess.connection.getConnectionMetrics() != null) {
@ -802,19 +800,16 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param responses - the response, if any * @param responses - the response, if any
* @param numAttempt - the attempt * @param numAttempt - the attempt
*/ */
private void receiveMultiAction(MultiAction multiAction, private void receiveMultiAction(MultiAction multiAction, ServerName server,
ServerName server, MultiResponse responses, int numAttempt) { MultiResponse responses, int numAttempt) {
assert responses != null; assert responses != null;
updateStats(server, responses);
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
updateStats(server, results);
// Success or partial success // Success or partial success
// Analyze detailed results. We can still have individual failures to be redo. // Analyze detailed results. We can still have individual failures to be redo.
// two specific throwables are managed: // two specific throwables are managed:
// - DoNotRetryIOException: we continue to retry for other actions // - DoNotRetryIOException: we continue to retry for other actions
// - RegionMovedException: we update the cache with the new region location // - RegionMovedException: we update the cache with the new region location
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
List<Action> toReplay = new ArrayList<>(); List<Action> toReplay = new ArrayList<>();
Throwable lastException = null; Throwable lastException = null;
int failureCount = 0; int failureCount = 0;
@ -926,26 +921,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
} }
@VisibleForTesting @VisibleForTesting
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { protected void updateStats(ServerName server, MultiResponse resp) {
boolean metrics = asyncProcess.connection.getConnectionMetrics() != null; ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
boolean stats = asyncProcess.connection.getStatisticsTracker() != null; Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp);
if (!stats && !metrics) {
return;
}
for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
byte[] regionName = regionStats.getKey();
ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
if (stat == null) {
LOG.error("No ClientProtos.RegionLoadStats found for server=" + server
+ ", region=" + Bytes.toStringBinary(regionName));
continue;
}
RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat);
ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server,
regionName, regionLoadstats);
ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(),
server, regionName, regionLoadstats);
}
} }

View File

@ -62,8 +62,10 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer; import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@ -672,4 +674,25 @@ public final class ConnectionUtils {
} }
} }
} }
static void updateStats(Optional<ServerStatisticTracker> optStats,
Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) {
if (!optStats.isPresent() && !optMetrics.isPresent()) {
// ServerStatisticTracker and MetricsConnection are both not present, just return
return;
}
resp.getResults().forEach((regionName, regionResult) -> {
ClientProtos.RegionLoadStats stat = regionResult.getStat();
if (stat == null) {
LOG.error("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName,
Bytes.toStringBinary(regionName));
return;
}
RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat);
optStats.ifPresent(
stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats));
optMetrics.ifPresent(
metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats));
});
}
} }

View File

@ -421,13 +421,9 @@ public class MetricsConnection implements StatisticTrackable {
this.runnerStats.incrNormalRunners(); this.runnerStats.incrNormalRunners();
} }
/** Increment the number of delay runner counts. */ /** Increment the number of delay runner counts and update delay interval of delay runner. */
public void incrDelayRunners() { public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
this.runnerStats.incrDelayRunners(); this.runnerStats.incrDelayRunners();
}
/** Update delay interval of delay runner. */
public void updateDelayInterval(long interval) {
this.runnerStats.updateDelayInterval(interval); this.runnerStats.updateDelayInterval(interval);
} }

View File

@ -357,8 +357,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
preCheck(); preCheck();
return RawAsyncTableImpl.this return RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc, .action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
stub, mutation, loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
resp -> resp.getExists())) resp -> resp.getExists()))
@ -373,7 +373,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method. // so here I write a new method as I do not want to change the abstraction of call method.
private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
Converter<MultiRequest, byte[], RowMutations> reqConvert, Converter<MultiRequest, byte[], RowMutations> reqConvert,
Function<Result, RESP> respConverter) { Function<Result, RESP> respConverter) {
@ -391,6 +391,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
try { try {
org.apache.hadoop.hbase.client.MultiResponse multiResp = org.apache.hadoop.hbase.client.MultiResponse multiResp =
ResponseConverter.getResults(req, resp, controller.cellScanner()); ResponseConverter.getResults(req, resp, controller.cellScanner());
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
loc.getServerName(), multiResp);
Throwable ex = multiResp.getException(regionName); Throwable ex = multiResp.getException(regionName);
if (ex != null) { if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex future.completeExceptionally(ex instanceof IOException ? ex
@ -415,8 +417,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) { public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs) return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub, .action((controller, loc, stub) -> this.<Void> mutateRow(controller, loc, stub, mutation,
mutation, (rn, rm) -> { (rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true); regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();

View File

@ -19,15 +19,12 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Tracks the statistics for multiple regions * Tracks the statistics for multiple regions
@ -53,9 +50,4 @@ public class ServerStatisticTracker implements StatisticTrackable {
} }
return new ServerStatisticTracker(); return new ServerStatisticTracker();
} }
@VisibleForTesting
ServerStatistics getServerStatsForTesting(ServerName server) {
return stats.get(server);
}
} }

View File

@ -271,7 +271,7 @@ public class TestAsyncProcess {
} }
@Override @Override
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { protected void updateStats(ServerName server, MultiResponse resp) {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy. // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
} }

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test that we can actually send and use region metrics to slowdown client writes
*/
public abstract class ClientPushbackTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ClientPushbackTestBase.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected static final TableName tableName = TableName.valueOf("client-pushback");
private static final byte[] family = Bytes.toBytes("f");
private static final byte[] qualifier = Bytes.toBytes("q");
private static final long flushSizeBytes = 512;
@BeforeClass
public static void setupCluster() throws Exception {
Configuration conf = UTIL.getConfiguration();
// enable backpressure
conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
// use the exponential backoff policy
conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class,
ClientBackoffPolicy.class);
// turn the memstore size way down so we don't need to write a lot to see changes in memstore
// load
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
// ensure we block the flushes when we are double that flushsize
conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
UTIL.startMiniCluster(1);
UTIL.createTable(tableName, family);
}
@AfterClass
public static void cleanupCluster() throws Exception {
UTIL.shutdownMiniCluster();
}
protected abstract ClientBackoffPolicy getBackoffPolicy() throws IOException;
protected abstract ServerStatisticTracker getStatisticsTracker() throws IOException;
protected abstract MetricsConnection getConnectionMetrics() throws IOException;
protected abstract void mutate(Put put) throws IOException;
protected abstract void mutate(Put put, AtomicLong endTime, CountDownLatch latch)
throws IOException;
protected abstract void mutateRow(RowMutations mutations) throws IOException;
@Test
public void testClientTracksServerPushback() throws Exception {
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
Region region = rs.getRegions(tableName).get(0);
LOG.debug("Writing some data to " + tableName);
// write some data
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(family, qualifier, Bytes.toBytes("value1"));
mutate(p);
// get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
int load = (int) ((region.getMemStoreHeapSize() * 100) / flushSizeBytes);
LOG.debug("Done writing some data to " + tableName);
// get the stats for the region hosting our table
ClientBackoffPolicy backoffPolicy = getBackoffPolicy();
assertTrue("Backoff policy is not correctly configured",
backoffPolicy instanceof ExponentialClientBackoffPolicy);
ServerStatisticTracker stats = getStatisticsTracker();
assertNotNull("No stats configured for the client!", stats);
// get the names so we can query the stats
ServerName server = rs.getServerName();
byte[] regionName = region.getRegionInfo().getRegionName();
// check to see we found some load on the memstore
ServerStatistics serverStats = stats.getStats(server);
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
assertEquals("We did not find some load on the memstore", load,
regionStats.getMemStoreLoadPercent());
// check that the load reported produces a nonzero delay
long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
assertNotEquals("Reported load does not produce a backoff", 0, backoffTime);
LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " +
server + " is " + backoffTime);
CountDownLatch latch = new CountDownLatch(1);
AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
mutate(p, endTime, latch);
// Currently the ExponentialClientBackoffPolicy under these test conditions
// produces a backoffTime of 151 milliseconds. This is long enough so the
// wait and related checks below are reasonable. Revisit if the backoff
// time reported by above debug logging has significantly deviated.
MetricsConnection metrics = getConnectionMetrics();
String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
MetricsConnection.RegionStats rsStats = metrics.serverStats.get(server).get(regionName);
assertEquals(name, rsStats.name);
assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
(double) regionStats.getHeapOccupancyPercent(), 0.1);
assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
(double) regionStats.getMemStoreLoadPercent(), 0.1);
MetricsConnection.RunnerStats runnerStats = metrics.runnerStats;
assertEquals(1, runnerStats.delayRunners.getCount());
assertEquals(1, runnerStats.normalRunners.getCount());
assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(), (double) backoffTime,
0.1);
latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
assertNotEquals("AsyncProcess did not submit the work time", 0, endTime.get());
assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
}
@Test
public void testMutateRowStats() throws IOException {
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
Region region = rs.getRegions(tableName).get(0);
RowMutations mutations = new RowMutations(Bytes.toBytes("row"));
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(family, qualifier, Bytes.toBytes("value2"));
mutations.add(p);
mutateRow(mutations);
ServerStatisticTracker stats = getStatisticsTracker();
assertNotNull("No stats configured for the client!", stats);
// get the names so we can query the stats
ServerName server = rs.getServerName();
byte[] regionName = region.getRegionInfo().getRegionName();
// check to see we found some load on the memstore
ServerStatistics serverStats = stats.getStats(server);
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
assertNotNull(regionStats);
assertTrue(regionStats.getMemStoreLoadPercent() > 0);
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncClientPushback extends ClientPushbackTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncClientPushback.class);
private AsyncConnectionImpl conn;
private AsyncBufferedMutator mutator;
@Before
public void setUp() throws Exception {
conn =
(AsyncConnectionImpl) ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
mutator = conn.getBufferedMutator(tableName);
}
@After
public void tearDown() throws IOException {
Closeables.close(mutator, true);
Closeables.close(conn, true);
}
@Override
protected ClientBackoffPolicy getBackoffPolicy() throws IOException {
return conn.getBackoffPolicy();
}
@Override
protected ServerStatisticTracker getStatisticsTracker() throws IOException {
return conn.getStatisticsTracker().get();
}
@Override
protected MetricsConnection getConnectionMetrics() throws IOException {
return conn.getConnectionMetrics().get();
}
@Override
protected void mutate(Put put) throws IOException {
CompletableFuture<?> future = mutator.mutate(put);
mutator.flush();
future.join();
}
@Override
protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) throws IOException {
FutureUtils.addListener(mutator.mutate(put), (r, e) -> {
endTime.set(EnvironmentEdgeManager.currentTime());
latch.countDown();
});
mutator.flush();
}
@Override
protected void mutateRow(RowMutations mutations) throws IOException {
conn.getTable(tableName).mutateRow(mutations).join();
}
}

View File

@ -17,193 +17,90 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass; import org.junit.After;
import org.junit.BeforeClass; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
* Test that we can actually send and use region metrics to slowdown client writes
*/ @Category({ MediumTests.class, ClientTests.class })
@Category(MediumTests.class) public class TestClientPushback extends ClientPushbackTestBase {
public class TestClientPushback {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClientPushback.class); HBaseClassTestRule.forClass(TestClientPushback.class);
private static final Logger LOG = LoggerFactory.getLogger(TestClientPushback.class); private ConnectionImplementation conn;
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final TableName tableName = TableName.valueOf("client-pushback"); private BufferedMutatorImpl mutator;
private static final byte[] family = Bytes.toBytes("f");
private static final byte[] qualifier = Bytes.toBytes("q");
private static final long flushSizeBytes = 512;
@BeforeClass @Before
public static void setupCluster() throws Exception{ public void setUp() throws IOException {
Configuration conf = UTIL.getConfiguration(); conn = (ConnectionImplementation) ConnectionFactory.createConnection(UTIL.getConfiguration());
// enable backpressure mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName);
conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
// use the exponential backoff policy
conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class,
ClientBackoffPolicy.class);
// turn the memstore size way down so we don't need to write a lot to see changes in memstore
// load
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
// ensure we block the flushes when we are double that flushsize
conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
UTIL.startMiniCluster(1);
UTIL.createTable(tableName, family);
} }
@AfterClass @After
public static void teardownCluster() throws Exception{ public void tearDown() throws IOException {
UTIL.shutdownMiniCluster(); Closeables.close(mutator, true);
Closeables.close(conn, true);
} }
@Test @Override
public void testClientTracksServerPushback() throws Exception{ protected ClientBackoffPolicy getBackoffPolicy() throws IOException {
Configuration conf = UTIL.getConfiguration(); return conn.getBackoffPolicy();
}
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); @Override
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName); protected ServerStatisticTracker getStatisticsTracker() throws IOException {
return conn.getStatisticsTracker();
}
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); @Override
Region region = rs.getRegions(tableName).get(0); protected MetricsConnection getConnectionMetrics() throws IOException {
return conn.getConnectionMetrics();
}
LOG.debug("Writing some data to "+tableName); @Override
// write some data protected void mutate(Put put) throws IOException {
Put p = new Put(Bytes.toBytes("row")); mutator.mutate(put);
p.addColumn(family, qualifier, Bytes.toBytes("value1"));
mutator.mutate(p);
mutator.flush(); mutator.flush();
}
// get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data @Override
int load = (int) ((region.getMemStoreHeapSize() * 100) protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) throws IOException {
/ flushSizeBytes);
LOG.debug("Done writing some data to "+tableName);
// get the stats for the region hosting our table
ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
assertTrue("Backoff policy is not correctly configured",
backoffPolicy instanceof ExponentialClientBackoffPolicy);
ServerStatisticTracker stats = conn.getStatisticsTracker();
assertNotNull( "No stats configured for the client!", stats);
// get the names so we can query the stats
ServerName server = rs.getServerName();
byte[] regionName = region.getRegionInfo().getRegionName();
// check to see we found some load on the memstore
ServerStatistics serverStats = stats.getServerStatsForTesting(server);
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
assertEquals("We did not find some load on the memstore", load,
regionStats.getMemStoreLoadPercent());
// check that the load reported produces a nonzero delay
long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
assertNotEquals("Reported load does not produce a backoff", 0, backoffTime);
LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " +
server + " is " + backoffTime);
// Reach into the connection and submit work directly to AsyncProcess so we can // Reach into the connection and submit work directly to AsyncProcess so we can
// monitor how long the submission was delayed via a callback // monitor how long the submission was delayed via a callback
List<Row> ops = new ArrayList<>(1); List<Row> ops = new ArrayList<>(1);
ops.add(p); ops.add(put);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> { Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
endTime.set(EnvironmentEdgeManager.currentTime()); endTime.set(EnvironmentEdgeManager.currentTime());
latch.countDown(); latch.countDown();
}; };
AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback) AsyncProcessTask<Result> task =
.setPool(mutator.getPool()) AsyncProcessTask.newBuilder(callback).setPool(mutator.getPool()).setTableName(tableName)
.setTableName(tableName) .setRowAccess(ops).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
.setRowAccess(ops) .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
.setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) .setRpcTimeout(60 * 1000).build();
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
.setRpcTimeout(60 * 1000)
.build();
mutator.getAsyncProcess().submit(task); mutator.getAsyncProcess().submit(task);
// Currently the ExponentialClientBackoffPolicy under these test conditions
// produces a backoffTime of 151 milliseconds. This is long enough so the
// wait and related checks below are reasonable. Revisit if the backoff
// time reported by above debug logging has significantly deviated.
String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
serverStats.get(server).get(regionName);
assertEquals(name, rsStats.name);
assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
(double)regionStats.getHeapOccupancyPercent(), 0.1 );
assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
(double)regionStats.getMemStoreLoadPercent(), 0.1);
MetricsConnection.RunnerStats runnerStats = conn.getConnectionMetrics().runnerStats;
assertEquals(1, runnerStats.delayRunners.getCount());
assertEquals(1, runnerStats.normalRunners.getCount());
assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(),
(double)backoffTime, 0.1);
latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
assertNotEquals("AsyncProcess did not submit the work time", 0, endTime.get());
assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
} }
@Test @Override
public void testMutateRowStats() throws IOException { protected void mutateRow(RowMutations mutations) throws IOException {
Configuration conf = UTIL.getConfiguration(); try (Table table = conn.getTable(tableName)) {
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); table.mutateRow(mutations);
Table table = conn.getTable(tableName);
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
Region region = rs.getRegions(tableName).get(0);
RowMutations mutations = new RowMutations(Bytes.toBytes("row"));
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(family, qualifier, Bytes.toBytes("value2"));
mutations.add(p);
table.mutateRow(mutations);
ServerStatisticTracker stats = conn.getStatisticsTracker();
assertNotNull( "No stats configured for the client!", stats);
// get the names so we can query the stats
ServerName server = rs.getServerName();
byte[] regionName = region.getRegionInfo().getRegionName();
// check to see we found some load on the memstore
ServerStatistics serverStats = stats.getServerStatsForTesting(server);
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
assertNotNull(regionStats);
assertTrue(regionStats.getMemStoreLoadPercent() > 0);
} }
}
} }