HBASE-12730 Backport HBASE-5162 (Basic client pushback mechanism) to branch-1

This commit is contained in:
Andrew Purtell 2015-01-22 14:50:40 -08:00
parent 05f4e0c715
commit 04a003d6a2
29 changed files with 1984 additions and 89 deletions

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -313,7 +315,8 @@ class AsyncProcess {
* Uses default ExecutorService for this AP (must have been created with one). * Uses default ExecutorService for this AP (must have been created with one).
*/ */
public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException { boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
throws InterruptedIOException {
return submit(null, tableName, rows, atLeastOne, callback, needResults); return submit(null, tableName, rows, atLeastOne, callback, needResults);
} }
@ -374,7 +377,7 @@ class AsyncProcess {
locationErrors = new ArrayList<Exception>(); locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>(); locationErrorRows = new ArrayList<Integer>();
LOG.error("Failed to get region location ", ex); LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Add it to retained but do not add to submit list. // This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state. // We will then add it to ars in an already-failed state.
retainedActions.add(new Action<Row>(r, ++posInList)); retainedActions.add(new Action<Row>(r, ++posInList));
locationErrors.add(ex); locationErrors.add(ex);
@ -918,14 +921,12 @@ class AsyncProcess {
return loc; return loc;
} }
/** /**
* Send a multi action structure to the servers, after a delay depending on the attempt * Send a multi action structure to the servers, after a delay depending on the attempt
* number. Asynchronous. * number. Asynchronous.
* *
* @param actionsByServer the actions structured by regions * @param actionsByServer the actions structured by regions
* @param numAttempt the attempt number. * @param numAttempt the attempt number.
* @param actionsForReplicaThread original actions for replica thread; null on non-first call. * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
*/ */
private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer, private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
@ -935,33 +936,98 @@ class AsyncProcess {
int actionsRemaining = actionsByServer.size(); int actionsRemaining = actionsByServer.size();
// This iteration is by server (the HRegionLocation comparator is by server portion only). // This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) { for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
final ServerName server = e.getKey(); ServerName server = e.getKey();
final MultiAction<Row> multiAction = e.getValue(); MultiAction<Row> multiAction = e.getValue();
incTaskCounters(multiAction.getRegions(), server); incTaskCounters(multiAction.getRegions(), server);
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
new SingleServerRequestRunnable(multiAction, numAttempt, server)); numAttempt);
if ((--actionsRemaining == 0) && reuseThread) { // make sure we correctly count the number of runnables before we try to reuse the send
runnable.run(); // thread, in case we had to split the request into different runnables because of backoff
} else { if (runnables.size() > actionsRemaining) {
try { actionsRemaining = runnables.size();
pool.submit(runnable); }
} catch (RejectedExecutionException ree) {
// This should never happen. But as the pool is provided by the end user, let's secure // run all the runnables
// this a little. for (Runnable runnable : runnables) {
decTaskCounters(multiAction.getRegions(), server); if ((--actionsRemaining == 0) && reuseThread) {
LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + runnable.run();
" Server is " + server.getServerName(), ree); } else {
// We're likely to fail again, but this will increment the attempt counter, so it will try {
// finish. pool.submit(runnable);
receiveGlobalFailure(multiAction, server, numAttempt, ree); } catch (RejectedExecutionException ree) {
// This should never happen. But as the pool is provided by the end user, let's secure
// this a little.
decTaskCounters(multiAction.getRegions(), server);
LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
" Server is " + server.getServerName(), ree);
// We're likely to fail again, but this will increment the attempt counter, so it will
// finish.
receiveGlobalFailure(multiAction, server, numAttempt, ree);
}
} }
} }
} }
if (actionsForReplicaThread != null) { if (actionsForReplicaThread != null) {
startWaitingForReplicaCalls(actionsForReplicaThread); startWaitingForReplicaCalls(actionsForReplicaThread);
} }
} }
private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
MultiAction<Row> multiAction,
int numAttempt) {
// no stats to manage, just do the standard action
if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
new SingleServerRequestRunnable(multiAction, numAttempt, server)));
}
// group the actions by the amount of delay
Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
.size());
// split up the actions
for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
Long backoff = getBackoff(server, e.getKey());
DelayingRunner runner = actions.get(backoff);
if (runner == null) {
actions.put(backoff, new DelayingRunner(backoff, e));
} else {
runner.add(e);
}
}
List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
for (DelayingRunner runner : actions.values()) {
String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable =
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server);
// use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) {
runner.setRunner(runnable);
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
runnable = runner;
}
runnable = Trace.wrap(traceText, runnable);
toReturn.add(runnable);
}
return toReturn;
}
/**
* @param server server location where the target region is hosted
* @param regionName name of the region which we are going to write some data
* @return the amount of time the client should wait until it submit a request to the
* specified server and region
*/
private Long getBackoff(ServerName server, byte[] regionName) {
ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
ServerStatistics stats = tracker.getStats(server);
return AsyncProcess.this.connection.getBackoffPolicy()
.getBackoffTime(server, regionName, stats);
}
/** /**
* Starts waiting to issue replica calls on a different thread; or issues them immediately. * Starts waiting to issue replica calls on a different thread; or issues them immediately.
*/ */
@ -1169,6 +1235,13 @@ class AsyncProcess {
++failed; ++failed;
} }
} else { } else {
// update the stats about the region, if its a user table. We don't want to slow down
// updates to meta tables, especially from internal updates (master, etc).
if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
result = ResultStatsUtil.updateStats(result,
AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
}
if (callback != null) { if (callback != null) {
try { try {
//noinspection unchecked //noinspection unchecked
@ -1497,7 +1570,6 @@ class AsyncProcess {
} }
} }
@VisibleForTesting @VisibleForTesting
/** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
@ -288,5 +290,14 @@ public interface ClusterConnection extends HConnection {
* @return true if this is a managed connection. * @return true if this is a managed connection.
*/ */
boolean isManaged(); boolean isManaged();
}
/**
* @return the current statistics tracker associated with this connection
*/
ServerStatisticTracker getStatisticsTracker();
/**
* @return the configured client backoff policy
*/
ClientBackoffPolicy getBackoffPolicy();
}

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@ -442,4 +444,14 @@ abstract class ConnectionAdapter implements ClusterConnection {
public boolean isManaged() { public boolean isManaged() {
return wrappedConnection.isManaged(); return wrappedConnection.isManaged();
} }
@Override
public ServerStatisticTracker getStatisticsTracker() {
return wrappedConnection.getStatisticsTracker();
}
@Override
public ClientBackoffPolicy getBackoffPolicy() {
return wrappedConnection.getBackoffPolicy();
}
} }

View File

@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -537,6 +539,8 @@ class ConnectionManager {
final int rpcTimeout; final int rpcTimeout;
private NonceGenerator nonceGenerator = null; private NonceGenerator nonceGenerator = null;
private final AsyncProcess asyncProcess; private final AsyncProcess asyncProcess;
// single tracker per connection
private final ServerStatisticTracker stats;
private volatile boolean closed; private volatile boolean closed;
private volatile boolean aborted; private volatile boolean aborted;
@ -592,6 +596,8 @@ class ConnectionManager {
*/ */
Registry registry; Registry registry;
private final ClientBackoffPolicy backoffPolicy;
HConnectionImplementation(Configuration conf, boolean managed) throws IOException { HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
this(conf, managed, null, null); this(conf, managed, null, null);
} }
@ -666,9 +672,11 @@ class ConnectionManager {
} else { } else {
this.nonceGenerator = new NoNonceGenerator(); this.nonceGenerator = new NoNonceGenerator();
} }
stats = ServerStatisticTracker.create(conf);
this.asyncProcess = createAsyncProcess(this.conf); this.asyncProcess = createAsyncProcess(this.conf);
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
} }
@Override @Override
@ -2184,7 +2192,8 @@ class ConnectionManager {
protected AsyncProcess createAsyncProcess(Configuration conf) { protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available. // No default pool available.
return new AsyncProcess(this, conf, this.batchPool, return new AsyncProcess(this, conf, this.batchPool,
RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf)); RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
RpcControllerFactory.instantiate(conf));
} }
@Override @Override
@ -2192,6 +2201,16 @@ class ConnectionManager {
return asyncProcess; return asyncProcess;
} }
@Override
public ServerStatisticTracker getStatisticsTracker() {
return this.stats;
}
@Override
public ClientBackoffPolicy getBackoffPolicy() {
return this.backoffPolicy;
}
/* /*
* Return the number of cached region for a table. It will only be called * Return the number of cached region for a table. It will only be called
* from a unit test. * from a unit test.
@ -2469,7 +2488,8 @@ class ConnectionManager {
@Override @Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor); return RpcRetryingCallerFactory
.instantiate(conf, this.interceptor, this.getStatisticsTracker());
} }
@Override @Override

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import java.util.List;
import java.util.Map;
/**
* A wrapper for a runnable for a group of actions for a single regionserver.
* <p>
* This can be used to build up the actions that should be taken and then
* </p>
* <p>
* This class exists to simulate using a ScheduledExecutorService with just a regular
* ExecutorService and Runnables. It is used for legacy reasons in the the client; this could
* only be removed if we change the expectations in HTable around the pool the client is able to
* pass in and even if we deprecate the current APIs would require keeping this class around
* for the interim to bridge between the legacy ExecutorServices and the scheduled pool.
* </p>
*/
@InterfaceAudience.Private
public class DelayingRunner<T> implements Runnable {
private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
private final Object sleepLock = new Object();
private boolean triggerWake = false;
private long sleepTime;
private MultiAction<T> actions = new MultiAction<T>();
private Runnable runnable;
public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) {
this.sleepTime = sleepTime;
add(e);
}
public void setRunner(Runnable runner) {
this.runnable = runner;
}
@Override
public void run() {
if (!sleep()) {
LOG.warn(
"Interrupted while sleeping for expected sleep time " + sleepTime + " ms");
}
//TODO maybe we should consider switching to a listenableFuture for the actual callable and
// then handling the results/errors as callbacks. That way we can decrement outstanding tasks
// even if we get interrupted here, but for now, we still need to run so we decrement the
// outstanding tasks
this.runnable.run();
}
/**
* Sleep for an expected amount of time.
* <p>
* This is nearly a copy of what the Sleeper does, but with the ability to know if you
* got interrupted while sleeping.
* </p>
*
* @return <tt>true</tt> if the sleep completely entirely successfully,
* but otherwise <tt>false</tt> if the sleep was interrupted.
*/
private boolean sleep() {
long now = EnvironmentEdgeManager.currentTime();
long startTime = now;
long waitTime = sleepTime;
while (waitTime > 0) {
long woke = -1;
try {
synchronized (sleepLock) {
if (triggerWake) break;
sleepLock.wait(waitTime);
}
woke = EnvironmentEdgeManager.currentTime();
} catch (InterruptedException iex) {
return false;
}
// Recalculate waitTime.
woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke;
waitTime = waitTime - (woke - startTime);
}
return true;
}
public void add(Map.Entry<byte[], List<Action<T>>> e) {
actions.add(e.getKey(), e.getValue());
}
public MultiAction<T> getActions() {
return actions;
}
public long getSleepTime() {
return sleepTime;
}
}

View File

@ -1872,8 +1872,9 @@ public class HTable implements HTableInterface {
AsyncProcess asyncProcess = AsyncProcess asyncProcess =
new AsyncProcess(connection, configuration, pool, new AsyncProcess(connection, configuration, pool,
RpcRetryingCallerFactory.instantiate(configuration), true, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
RpcControllerFactory.instantiate(configuration)); true, RpcControllerFactory.instantiate(configuration));
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() { new Callback<ClientProtos.CoprocessorServiceResult>() {
@Override @Override

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -68,12 +69,24 @@ public final class MultiAction<R> {
* @param a * @param a
*/ */
public void add(byte[] regionName, Action<R> a) { public void add(byte[] regionName, Action<R> a) {
add(regionName, Arrays.asList(a));
}
/**
* Add an Action to this container based on it's regionName. If the regionName
* is wrong, the initial execution will fail, but will be automatically
* retried after looking up the correct region.
*
* @param regionName
* @param actionList list of actions to add for the region
*/
public void add(byte[] regionName, List<Action<R>> actionList){
List<Action<R>> rsActions = actions.get(regionName); List<Action<R>> rsActions = actions.get(regionName);
if (rsActions == null) { if (rsActions == null) {
rsActions = new ArrayList<Action<R>>(); rsActions = new ArrayList<Action<R>>(actionList.size());
actions.put(regionName, rsActions); actions.put(regionName, rsActions);
} }
rsActions.add(a); rsActions.addAll(actionList);
} }
public void setNonceGroup(long nonceGroup) { public void setNonceGroup(long nonceGroup) {

View File

@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -94,6 +97,7 @@ public class Result implements CellScannable, CellScanner {
* Index for where we are when Result is acting as a {@link CellScanner}. * Index for where we are when Result is acting as a {@link CellScanner}.
*/ */
private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX; private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX;
private ClientProtos.RegionLoadStats stats;
/** /**
* Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}.
@ -871,4 +875,20 @@ public class Result implements CellScannable, CellScanner {
public boolean isStale() { public boolean isStale() {
return stale; return stale;
} }
/**
* Add load information about the region to the information about the result
* @param loadStats statistics about the current region from which this was returned
*/
public void addResults(ClientProtos.RegionLoadStats loadStats) {
this.stats = loadStats;
}
/**
* @return the associated statistics about the region from which this was returned. Can be
* <tt>null</tt> if stats are disabled.
*/
public ClientProtos.RegionLoadStats getStats() {
return stats;
}
} }

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
/**
* A {@link Result} with some statistics about the server/region status
*/
@InterfaceAudience.Private
public final class ResultStatsUtil {
private ResultStatsUtil() {
//private ctor for util class
}
/**
* Update the stats for the specified region if the result is an instance of {@link
* ResultStatsUtil}
*
* @param r object that contains the result and possibly the statistics about the region
* @param serverStats stats tracker to update from the result
* @param server server from which the result was obtained
* @param regionName full region name for the stats.
* @return the underlying {@link Result} if the passed result is an {@link
* ResultStatsUtil} or just returns the result;
*/
public static <T> T updateStats(T r, ServerStatisticTracker serverStats,
ServerName server, byte[] regionName) {
if (!(r instanceof Result)) {
return r;
}
Result result = (Result) r;
// early exit if there are no stats to collect
ClientProtos.RegionLoadStats stats = result.getStats();
if(stats == null){
return r;
}
if (regionName != null) {
serverStats.updateRegionStats(server, regionName, stats);
}
return r;
}
public static <T> T updateStats(T r, ServerStatisticTracker stats,
HRegionLocation regionLocation) {
byte[] regionName = null;
ServerName server = null;
if (regionLocation != null) {
server = regionLocation.getServerName();
regionName = regionLocation.getRegionInfo().getRegionName();
}
return updateStats(r, stats, server, regionName);
}
}

View File

@ -35,6 +35,8 @@ public class RpcRetryingCallerFactory {
private final int retries; private final int retries;
private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
private final boolean enableBackPressure;
private ServerStatisticTracker stats;
public RpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory(Configuration conf) {
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
@ -49,27 +51,57 @@ public class RpcRetryingCallerFactory {
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor; this.interceptor = interceptor;
enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
}
/**
* Set the tracker that should be used for tracking statistics about the server
*/
public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
this.stats = statisticTracker;
} }
public <T> RpcRetryingCaller<T> newCaller() { public <T> RpcRetryingCaller<T> newCaller() {
// We store the values in the factory instance. This way, constructing new objects // We store the values in the factory instance. This way, constructing new objects
// is cheap as it does not require parsing a complex structure. // is cheap as it does not require parsing a complex structure.
return new RpcRetryingCaller<T>(pause, retries, interceptor, startLogErrorsCnt); RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
startLogErrorsCnt);
// wrap it with stats, if we are tracking them
if (enableBackPressure && this.stats != null) {
caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, interceptor,
startLogErrorsCnt, stats);
}
return caller;
} }
public static RpcRetryingCallerFactory instantiate(Configuration configuration) { public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
} }
public static RpcRetryingCallerFactory instantiate(Configuration configuration, public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor) { ServerStatisticTracker stats) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
String clazzName = RpcRetryingCallerFactory.class.getName(); String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz = String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) { if (rpcCallerFactoryClazz.equals(clazzName)) {
return new RpcRetryingCallerFactory(configuration, interceptor); factory = new RpcRetryingCallerFactory(configuration, interceptor);
} else {
factory = ReflectionUtils.instantiateWithCustomCtor(
rpcCallerFactoryClazz, new Class[] { Configuration.class },
new Object[] { configuration });
} }
return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration }); // setting for backwards compat with existing caller factories, rather than in the ctor
factory.setStatisticTracker(stats);
return factory;
} }
} }

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Tracks the statistics for multiple regions
*/
@InterfaceAudience.Private
public class ServerStatisticTracker {
private final ConcurrentHashMap<ServerName, ServerStatistics> stats =
new ConcurrentHashMap<ServerName, ServerStatistics>();
public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
currentStats) {
ServerStatistics stat = stats.get(server);
if (stat == null) {
stat = stats.get(server);
// We don't have stats for that server yet, so we need to make an entry.
// If we race with another thread it's a harmless unnecessary allocation.
if (stat == null) {
stat = new ServerStatistics();
ServerStatistics old = stats.putIfAbsent(server, stat);
if (old != null) {
stat = old;
}
}
}
stat.update(region, currentStats);
}
public ServerStatistics getStats(ServerName server) {
return this.stats.get(server);
}
public static ServerStatisticTracker create(Configuration conf) {
if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) {
return null;
}
return new ServerStatisticTracker();
}
@VisibleForTesting
ServerStatistics getServerStatsForTesting(ServerName server) {
return stats.get(server);
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
/**
* An {@link RpcRetryingCaller} that will update the per-region stats for the call on return,
* if stats are available
*/
@InterfaceAudience.Private
public class StatsTrackingRpcRetryingCaller<T> extends RpcRetryingCaller<T> {
private final ServerStatisticTracker stats;
public StatsTrackingRpcRetryingCaller(long pause, int retries, int startLogErrorsCnt,
ServerStatisticTracker stats) {
super(pause, retries, startLogErrorsCnt);
this.stats = stats;
}
public StatsTrackingRpcRetryingCaller(long pause, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt,
ServerStatisticTracker stats) {
super(pause, retries, interceptor, startLogErrorsCnt);
this.stats = stats;
}
@Override
public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
T result = super.callWithRetries(callable, callTimeout);
return updateStatsAndUnwrap(result, callable);
}
@Override
public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
T result = super.callWithRetries(callable, callTimeout);
return updateStatsAndUnwrap(result, callable);
}
private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
// don't track stats about requests that aren't to regionservers
if (!(callable instanceof RegionServerCallable)) {
return result;
}
// mutli-server callables span multiple regions, so they don't have a location,
// but they are region server callables, so we have to handle them when we process the
// result in AsyncProcess#receiveMultiAction, not in here
if (callable instanceof MultiServerCallable) {
return result;
}
// update the stats for the single server callable
RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
HRegionLocation location = regionCallable.getLocation();
return ResultStatsUtil.updateStats(result, stats, location);
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.backoff;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Configurable policy for the amount of time a client should wait for a new request to the
* server when given the server load statistics.
* <p>
* Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration}
* </p>
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface ClientBackoffPolicy {
public static final String BACKOFF_POLICY_CLASS =
"hbase.client.statistics.backoff-policy";
/**
* @return the number of ms to wait on the client based on the
*/
public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats);
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.backoff;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ClientBackoffPolicyFactory {
private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class);
private ClientBackoffPolicyFactory() {
}
public static ClientBackoffPolicy create(Configuration conf) {
// create the backoff policy
String className =
conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class
.getName());
return ReflectionUtils.instantiateWithCustomCtor(className,
new Class<?>[] { Configuration.class }, new Object[] { conf });
}
/**
* Default backoff policy that doesn't create any backoff for the client, regardless of load
*/
public static class NoBackoffPolicy implements ClientBackoffPolicy {
public NoBackoffPolicy(Configuration conf){
// necessary to meet contract
}
@Override
public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
return 0;
}
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.backoff;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Simple exponential backoff policy on for the client that uses a percent^4 times the
* max backoff to generate the backoff time.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy {
private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class);
private static final long ONE_MINUTE = 60 * 1000;
public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE;
public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max";
private long maxBackoff;
public ExponentialClientBackoffPolicy(Configuration conf) {
this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF);
}
@Override
public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
// no stats for the server yet, so don't backoff
if (stats == null) {
return 0;
}
ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region);
// no stats for the region yet - don't backoff
if (regionStats == null) {
return 0;
}
// square the percent as a value less than 1. Closer we move to 100 percent,
// the percent moves to 1, but squaring causes the exponential curve
double percent = regionStats.getMemstoreLoadPercent() / 100.0;
double multiplier = Math.pow(percent, 4.0);
// shouldn't ever happen, but just incase something changes in the statistic data
if (multiplier > 1) {
LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " +
"down to the max backoff");
multiplier = 1;
}
return (long) (multiplier * maxBackoff);
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.backoff;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Map;
import java.util.TreeMap;
/**
* Track the statistics for a single region
*/
@InterfaceAudience.Private
public class ServerStatistics {
private Map<byte[], RegionStatistics>
stats = new TreeMap<byte[], RegionStatistics>(Bytes.BYTES_COMPARATOR);
/**
* Good enough attempt. Last writer wins. It doesn't really matter which one gets to update,
* as something gets set
* @param region
* @param currentStats
*/
public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) {
RegionStatistics regionStat = this.stats.get(region);
if(regionStat == null){
regionStat = new RegionStatistics();
this.stats.put(region, regionStat);
}
regionStat.update(currentStats);
}
@InterfaceAudience.Private
public RegionStatistics getStatsForRegion(byte[] regionName){
return stats.get(regionName);
}
public static class RegionStatistics{
private int memstoreLoad = 0;
public void update(ClientProtos.RegionLoadStats currentStats) {
this.memstoreLoad = currentStats.getMemstoreLoad();
}
public int getMemstoreLoadPercent(){
return this.memstoreLoad;
}
}
}

View File

@ -61,7 +61,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
this.connection = conn; this.connection = conn;
this.table = table; this.table = table;
this.row = row; this.row = row;
this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration()); this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null);
this.operationTimeout = conn.getConfiguration().getInt( this.operationTimeout = conn.getConfiguration().getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

View File

@ -114,17 +114,23 @@ public final class ResponseConverter {
} }
for (ResultOrException roe : actionResult.getResultOrExceptionList()) { for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
Object responseValue;
if (roe.hasException()) { if (roe.hasException()) {
results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException())); responseValue = ProtobufUtil.toException(roe.getException());
} else if (roe.hasResult()) { } else if (roe.hasResult()) {
results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells)); responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
// add the load stats, if we got any
if (roe.hasLoadStats()) {
((Result) responseValue).addResults(roe.getLoadStats());
}
} else if (roe.hasServiceResult()) { } else if (roe.hasServiceResult()) {
results.add(regionName, roe.getIndex(), roe.getServiceResult()); responseValue = roe.getServiceResult();
} else { } else {
// no result & no exception. Unexpected. // no result & no exception. Unexpected.
throw new IllegalStateException("No result & no exception roe=" + roe + throw new IllegalStateException("No result & no exception roe=" + roe +
" for region " + actions.getRegion()); " for region " + actions.getRegion());
} }
results.add(regionName, roe.getIndex(), responseValue);
} }
} }
@ -149,9 +155,11 @@ public final class ResponseConverter {
* @param r * @param r
* @return an action result builder * @return an action result builder
*/ */
public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) { public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r,
ClientProtos.RegionLoadStats stats) {
ResultOrException.Builder builder = ResultOrException.newBuilder(); ResultOrException.Builder builder = ResultOrException.newBuilder();
if (r != null) builder.setResult(r); if (r != null) builder.setResult(r);
if(stats != null) builder.setLoadStats(stats);
return builder; return builder;
} }

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestClientExponentialBackoff {
ServerName server = Mockito.mock(ServerName.class);
byte[] regionname = Bytes.toBytes("region");
@Test
public void testNulls() {
Configuration conf = new Configuration(false);
ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
assertEquals(0, backoff.getBackoffTime(null, null, null));
// server name doesn't matter to calculation, but check it now anyways
assertEquals(0, backoff.getBackoffTime(server, null, null));
assertEquals(0, backoff.getBackoffTime(server, regionname, null));
// check when no stats for the region yet
ServerStatistics stats = new ServerStatistics();
assertEquals(0, backoff.getBackoffTime(server, regionname, stats));
}
@Test
public void testMaxLoad() {
Configuration conf = new Configuration(false);
ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
ServerStatistics stats = new ServerStatistics();
update(stats, 100);
assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server,
regionname, stats));
// another policy with a different max timeout
long max = 100;
conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max);
ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf);
assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats));
// test beyond 100 still doesn't exceed the max
update(stats, 101);
assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server,
regionname, stats));
assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats));
// and that when we are below 100, its less than the max timeout
update(stats, 99);
assertTrue(backoff.getBackoffTime(server,
regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF);
assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max);
}
/**
* Make sure that we get results in the order that we expect - backoff for a load of 1 should
* less than backoff for 10, which should be less than that for 50.
*/
@Test
public void testResultOrdering() {
Configuration conf = new Configuration(false);
// make the max timeout really high so we get differentiation between load factors
conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE);
ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
ServerStatistics stats = new ServerStatistics();
long previous = backoff.getBackoffTime(server, regionname, stats);
for (int i = 1; i <= 100; i++) {
update(stats, i);
long next = backoff.getBackoffTime(server, regionname, stats);
assertTrue(
"Previous backoff time" + previous + " >= " + next + ", the next backoff time for " +
"load " + i, previous < next);
previous = next;
}
}
private void update(ServerStatistics stats, int load) {
ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder()
.setMemstoreLoad
(load).build();
stats.update(regionname, stat);
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestDelayingRunner {
private static final TableName DUMMY_TABLE =
TableName.valueOf("DUMMY_TABLE");
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
private static HRegionInfo hri1 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testDelayingRunner() throws Exception{
MultiAction<Row> ma = new MultiAction<Row>();
ma.add(hri1.getRegionName(), new Action<Row>(new Put(DUMMY_BYTES_1), 0));
final AtomicLong endTime = new AtomicLong();
final long sleepTime = 1000;
DelayingRunner runner = new DelayingRunner(sleepTime, ma.actions.entrySet().iterator().next());
runner.setRunner(new Runnable() {
@Override
public void run() {
endTime.set(EnvironmentEdgeManager.currentTime());
}
});
long startTime = EnvironmentEdgeManager.currentTime();
runner.run();
long delay = endTime.get() - startTime;
assertTrue("DelayingRunner did not delay long enough", delay >= sleepTime);
assertFalse("DelayingRunner delayed too long", delay > sleepTime + sleepTime*0.2);
}
}

View File

@ -1115,6 +1115,12 @@ public final class HConstants {
public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
"hbase.client.fast.fail.interceptor.impl"; "hbase.client.fast.fail.interceptor.impl";
/** Config key for if the server should send backpressure and if the client should listen to
* that backpressure from the server */
public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled";
public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false;
private HConstants() { private HConstants() {
// Can't be instantiated with this ctor. // Can't be instantiated with this ctor.
} }

View File

@ -26210,6 +26210,482 @@ public final class ClientProtos {
// @@protoc_insertion_point(class_scope:RegionAction) // @@protoc_insertion_point(class_scope:RegionAction)
} }
public interface RegionLoadStatsOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// optional int32 memstoreLoad = 1 [default = 0];
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
boolean hasMemstoreLoad();
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
int getMemstoreLoad();
}
/**
* Protobuf type {@code RegionLoadStats}
*
* <pre>
*
* Statistics about the current load on the region
* </pre>
*/
public static final class RegionLoadStats extends
com.google.protobuf.GeneratedMessage
implements RegionLoadStatsOrBuilder {
// Use RegionLoadStats.newBuilder() to construct.
private RegionLoadStats(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private RegionLoadStats(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final RegionLoadStats defaultInstance;
public static RegionLoadStats getDefaultInstance() {
return defaultInstance;
}
public RegionLoadStats getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private RegionLoadStats(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 8: {
bitField0_ |= 0x00000001;
memstoreLoad_ = input.readInt32();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable
.ensureFieldAccessorsInitialized(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class);
}
public static com.google.protobuf.Parser<RegionLoadStats> PARSER =
new com.google.protobuf.AbstractParser<RegionLoadStats>() {
public RegionLoadStats parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new RegionLoadStats(input, extensionRegistry);
}
};
@java.lang.Override
public com.google.protobuf.Parser<RegionLoadStats> getParserForType() {
return PARSER;
}
private int bitField0_;
// optional int32 memstoreLoad = 1 [default = 0];
public static final int MEMSTORELOAD_FIELD_NUMBER = 1;
private int memstoreLoad_;
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
public boolean hasMemstoreLoad() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
public int getMemstoreLoad() {
return memstoreLoad_;
}
private void initFields() {
memstoreLoad_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt32(1, memstoreLoad_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(1, memstoreLoad_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) obj;
boolean result = true;
result = result && (hasMemstoreLoad() == other.hasMemstoreLoad());
if (hasMemstoreLoad()) {
result = result && (getMemstoreLoad()
== other.getMemstoreLoad());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
private int memoizedHashCode = 0;
@java.lang.Override
public int hashCode() {
if (memoizedHashCode != 0) {
return memoizedHashCode;
}
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
if (hasMemstoreLoad()) {
hash = (37 * hash) + MEMSTORELOAD_FIELD_NUMBER;
hash = (53 * hash) + getMemstoreLoad();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code RegionLoadStats}
*
* <pre>
*
* Statistics about the current load on the region
* </pre>
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable
.ensureFieldAccessorsInitialized(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class);
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
memstoreLoad_ = 0;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor;
}
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats build() {
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.memstoreLoad_ = memstoreLoad_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) return this;
if (other.hasMemstoreLoad()) {
setMemstoreLoad(other.getMemstoreLoad());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// optional int32 memstoreLoad = 1 [default = 0];
private int memstoreLoad_ ;
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
public boolean hasMemstoreLoad() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
public int getMemstoreLoad() {
return memstoreLoad_;
}
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
public Builder setMemstoreLoad(int value) {
bitField0_ |= 0x00000001;
memstoreLoad_ = value;
onChanged();
return this;
}
/**
* <code>optional int32 memstoreLoad = 1 [default = 0];</code>
*
* <pre>
* percent load on the memstore. Guaranteed to be positive, between 0 and 100
* </pre>
*/
public Builder clearMemstoreLoad() {
bitField0_ = (bitField0_ & ~0x00000001);
memstoreLoad_ = 0;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:RegionLoadStats)
}
static {
defaultInstance = new RegionLoadStats(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:RegionLoadStats)
}
public interface ResultOrExceptionOrBuilder public interface ResultOrExceptionOrBuilder
extends com.google.protobuf.MessageOrBuilder { extends com.google.protobuf.MessageOrBuilder {
@ -26286,6 +26762,32 @@ public final class ClientProtos {
* </pre> * </pre>
*/ */
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResultOrBuilder getServiceResultOrBuilder(); org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResultOrBuilder getServiceResultOrBuilder();
// optional .RegionLoadStats loadStats = 5;
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
boolean hasLoadStats();
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats();
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder();
} }
/** /**
* Protobuf type {@code ResultOrException} * Protobuf type {@code ResultOrException}
@ -26389,6 +26891,19 @@ public final class ClientProtos {
bitField0_ |= 0x00000008; bitField0_ |= 0x00000008;
break; break;
} }
case 42: {
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder subBuilder = null;
if (((bitField0_ & 0x00000010) == 0x00000010)) {
subBuilder = loadStats_.toBuilder();
}
loadStats_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(loadStats_);
loadStats_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000010;
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -26533,11 +27048,46 @@ public final class ClientProtos {
return serviceResult_; return serviceResult_;
} }
// optional .RegionLoadStats loadStats = 5;
public static final int LOADSTATS_FIELD_NUMBER = 5;
private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_;
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public boolean hasLoadStats() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() {
return loadStats_;
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() {
return loadStats_;
}
private void initFields() { private void initFields() {
index_ = 0; index_ = 0;
result_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.getDefaultInstance(); result_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.getDefaultInstance();
exception_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair.getDefaultInstance(); exception_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair.getDefaultInstance();
serviceResult_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResult.getDefaultInstance(); serviceResult_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResult.getDefaultInstance();
loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -26575,6 +27125,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) { if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeMessage(4, serviceResult_); output.writeMessage(4, serviceResult_);
} }
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(5, loadStats_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -26600,6 +27153,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(4, serviceResult_); .computeMessageSize(4, serviceResult_);
} }
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, loadStats_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -26643,6 +27200,11 @@ public final class ClientProtos {
result = result && getServiceResult() result = result && getServiceResult()
.equals(other.getServiceResult()); .equals(other.getServiceResult());
} }
result = result && (hasLoadStats() == other.hasLoadStats());
if (hasLoadStats()) {
result = result && getLoadStats()
.equals(other.getLoadStats());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -26672,6 +27234,10 @@ public final class ClientProtos {
hash = (37 * hash) + SERVICE_RESULT_FIELD_NUMBER; hash = (37 * hash) + SERVICE_RESULT_FIELD_NUMBER;
hash = (53 * hash) + getServiceResult().hashCode(); hash = (53 * hash) + getServiceResult().hashCode();
} }
if (hasLoadStats()) {
hash = (37 * hash) + LOADSTATS_FIELD_NUMBER;
hash = (53 * hash) + getLoadStats().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -26783,6 +27349,7 @@ public final class ClientProtos {
getResultFieldBuilder(); getResultFieldBuilder();
getExceptionFieldBuilder(); getExceptionFieldBuilder();
getServiceResultFieldBuilder(); getServiceResultFieldBuilder();
getLoadStatsFieldBuilder();
} }
} }
private static Builder create() { private static Builder create() {
@ -26811,6 +27378,12 @@ public final class ClientProtos {
serviceResultBuilder_.clear(); serviceResultBuilder_.clear();
} }
bitField0_ = (bitField0_ & ~0x00000008); bitField0_ = (bitField0_ & ~0x00000008);
if (loadStatsBuilder_ == null) {
loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
} else {
loadStatsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000010);
return this; return this;
} }
@ -26867,6 +27440,14 @@ public final class ClientProtos {
} else { } else {
result.serviceResult_ = serviceResultBuilder_.build(); result.serviceResult_ = serviceResultBuilder_.build();
} }
if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
to_bitField0_ |= 0x00000010;
}
if (loadStatsBuilder_ == null) {
result.loadStats_ = loadStats_;
} else {
result.loadStats_ = loadStatsBuilder_.build();
}
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -26895,6 +27476,9 @@ public final class ClientProtos {
if (other.hasServiceResult()) { if (other.hasServiceResult()) {
mergeServiceResult(other.getServiceResult()); mergeServiceResult(other.getServiceResult());
} }
if (other.hasLoadStats()) {
mergeLoadStats(other.getLoadStats());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -27374,6 +27958,159 @@ public final class ClientProtos {
return serviceResultBuilder_; return serviceResultBuilder_;
} }
// optional .RegionLoadStats loadStats = 5;
private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> loadStatsBuilder_;
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public boolean hasLoadStats() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() {
if (loadStatsBuilder_ == null) {
return loadStats_;
} else {
return loadStatsBuilder_.getMessage();
}
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public Builder setLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) {
if (loadStatsBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
loadStats_ = value;
onChanged();
} else {
loadStatsBuilder_.setMessage(value);
}
bitField0_ |= 0x00000010;
return this;
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public Builder setLoadStats(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder builderForValue) {
if (loadStatsBuilder_ == null) {
loadStats_ = builderForValue.build();
onChanged();
} else {
loadStatsBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000010;
return this;
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public Builder mergeLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) {
if (loadStatsBuilder_ == null) {
if (((bitField0_ & 0x00000010) == 0x00000010) &&
loadStats_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) {
loadStats_ =
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder(loadStats_).mergeFrom(value).buildPartial();
} else {
loadStats_ = value;
}
onChanged();
} else {
loadStatsBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000010;
return this;
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public Builder clearLoadStats() {
if (loadStatsBuilder_ == null) {
loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
onChanged();
} else {
loadStatsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder getLoadStatsBuilder() {
bitField0_ |= 0x00000010;
onChanged();
return getLoadStatsFieldBuilder().getBuilder();
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() {
if (loadStatsBuilder_ != null) {
return loadStatsBuilder_.getMessageOrBuilder();
} else {
return loadStats_;
}
}
/**
* <code>optional .RegionLoadStats loadStats = 5;</code>
*
* <pre>
* current load on the region
* </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder>
getLoadStatsFieldBuilder() {
if (loadStatsBuilder_ == null) {
loadStatsBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder>(
loadStats_,
getParentForChildren(),
isClean());
loadStats_ = null;
}
return loadStatsBuilder_;
}
// @@protoc_insertion_point(builder_scope:ResultOrException) // @@protoc_insertion_point(builder_scope:ResultOrException)
} }
@ -31066,6 +31803,11 @@ public final class ClientProtos {
private static private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_RegionAction_fieldAccessorTable; internal_static_RegionAction_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_RegionLoadStats_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_RegionLoadStats_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor private static com.google.protobuf.Descriptors.Descriptor
internal_static_ResultOrException_descriptor; internal_static_ResultOrException_descriptor;
private static private static
@ -31180,31 +31922,33 @@ public final class ClientProtos {
"\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Cop" + "\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Cop" +
"rocessorServiceCall\"Y\n\014RegionAction\022 \n\006r" + "rocessorServiceCall\"Y\n\014RegionAction\022 \n\006r" +
"egion\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030" + "egion\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030" +
"\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"\221\001\n\021Resul" + "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"*\n\017Region" +
"tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + "LoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\"\266\001\n\021R" +
"\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + "esultOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006resul" +
"tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + "t\030\002 \001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.Na" +
"essorServiceResult\"f\n\022RegionActionResult", "meBytesPair\0221\n\016service_result\030\004 \001(\0132\031.Co",
"\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrEx" + "processorServiceResult\022#\n\tloadStats\030\005 \001(" +
"ception\022!\n\texception\030\002 \001(\0132\016.NameBytesPa" + "\0132\020.RegionLoadStats\"f\n\022RegionActionResul" +
"ir\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" + "t\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrE" +
"2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tc" + "xception\022!\n\texception\030\002 \001(\0132\016.NameBytesP" +
"ondition\030\003 \001(\0132\n.Condition\"S\n\rMultiRespo" + "air\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(" +
"nse\022/\n\022regionActionResult\030\001 \003(\0132\023.Region" + "\0132\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\t" +
"ActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consi" + "condition\030\003 \001(\0132\n.Condition\"S\n\rMultiResp" +
"stency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCli" + "onse\022/\n\022regionActionResult\030\001 \003(\0132\023.Regio" +
"entService\022 \n\003Get\022\013.GetRequest\032\014.GetResp" + "nActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Cons" +
"onse\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateR", "istency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCl",
"esponse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResp" + "ientService\022 \n\003Get\022\013.GetRequest\032\014.GetRes" +
"onse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReq" + "ponse\022)\n\006Mutate\022\016.MutateRequest\032\017.Mutate" +
"uest\032\026.BulkLoadHFileResponse\022F\n\013ExecServ" + "Response\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRes" +
"ice\022\032.CoprocessorServiceRequest\032\033.Coproc" + "ponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRe" +
"essorServiceResponse\022R\n\027ExecRegionServer" + "quest\032\026.BulkLoadHFileResponse\022F\n\013ExecSer" +
"Service\022\032.CoprocessorServiceRequest\032\033.Co" + "vice\022\032.CoprocessorServiceRequest\032\033.Copro" +
"processorServiceResponse\022&\n\005Multi\022\r.Mult" + "cessorServiceResponse\022R\n\027ExecRegionServe" +
"iRequest\032\016.MultiResponseBB\n*org.apache.h" + "rService\022\032.CoprocessorServiceRequest\032\033.C" +
"adoop.hbase.protobuf.generatedB\014ClientPr" + "oprocessorServiceResponse\022&\n\005Multi\022\r.Mul" +
"otosH\001\210\001\001\240\001\001" "tiRequest\032\016.MultiResponseBB\n*org.apache.",
"hadoop.hbase.protobuf.generatedB\014ClientP" +
"rotosH\001\210\001\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -31361,26 +32105,32 @@ public final class ClientProtos {
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RegionAction_descriptor, internal_static_RegionAction_descriptor,
new java.lang.String[] { "Region", "Atomic", "Action", }); new java.lang.String[] { "Region", "Atomic", "Action", });
internal_static_ResultOrException_descriptor = internal_static_RegionLoadStats_descriptor =
getDescriptor().getMessageTypes().get(22); getDescriptor().getMessageTypes().get(22);
internal_static_RegionLoadStats_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RegionLoadStats_descriptor,
new java.lang.String[] { "MemstoreLoad", });
internal_static_ResultOrException_descriptor =
getDescriptor().getMessageTypes().get(23);
internal_static_ResultOrException_fieldAccessorTable = new internal_static_ResultOrException_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ResultOrException_descriptor, internal_static_ResultOrException_descriptor,
new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", }); new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", "LoadStats", });
internal_static_RegionActionResult_descriptor = internal_static_RegionActionResult_descriptor =
getDescriptor().getMessageTypes().get(23); getDescriptor().getMessageTypes().get(24);
internal_static_RegionActionResult_fieldAccessorTable = new internal_static_RegionActionResult_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RegionActionResult_descriptor, internal_static_RegionActionResult_descriptor,
new java.lang.String[] { "ResultOrException", "Exception", }); new java.lang.String[] { "ResultOrException", "Exception", });
internal_static_MultiRequest_descriptor = internal_static_MultiRequest_descriptor =
getDescriptor().getMessageTypes().get(24); getDescriptor().getMessageTypes().get(25);
internal_static_MultiRequest_fieldAccessorTable = new internal_static_MultiRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MultiRequest_descriptor, internal_static_MultiRequest_descriptor,
new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", }); new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", });
internal_static_MultiResponse_descriptor = internal_static_MultiResponse_descriptor =
getDescriptor().getMessageTypes().get(25); getDescriptor().getMessageTypes().get(26);
internal_static_MultiResponse_fieldAccessorTable = new internal_static_MultiResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MultiResponse_descriptor, internal_static_MultiResponse_descriptor,

View File

@ -353,6 +353,14 @@ message RegionAction {
repeated Action action = 3; repeated Action action = 3;
} }
/*
* Statistics about the current load on the region
*/
message RegionLoadStats{
// percent load on the memstore. Guaranteed to be positive, between 0 and 100
optional int32 memstoreLoad = 1 [default = 0];
}
/** /**
* Either a Result or an Exception NameBytesPair (keyed by * Either a Result or an Exception NameBytesPair (keyed by
* exception name whose value is the exception stringified) * exception name whose value is the exception stringified)
@ -366,6 +374,8 @@ message ResultOrException {
optional NameBytesPair exception = 3; optional NameBytesPair exception = 3;
// result if this was a coprocessor service call // result if this was a coprocessor service call
optional CoprocessorServiceResult service_result = 4; optional CoprocessorServiceResult service_result = 4;
// current load on the region
optional RegionLoadStats loadStats = 5;
} }
/** /**

View File

@ -711,7 +711,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try { try {
List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>(); List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
Configuration conf = getConf(); Configuration conf = getConf();
boolean success = RpcRetryingCallerFactory.instantiate(conf).<Boolean> newCaller() boolean success = RpcRetryingCallerFactory.instantiate(conf,
null).<Boolean> newCaller()
.callWithRetries(svrCallable, Integer.MAX_VALUE); .callWithRetries(svrCallable, Integer.MAX_VALUE);
if (!success) { if (!success) {
LOG.warn("Attempt to bulk load region containing " LOG.warn("Attempt to bulk load region containing "

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -124,6 +125,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -553,6 +555,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
private final MetricsRegion metricsRegion; private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper; private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final Durability durability; private final Durability durability;
private final boolean regionStatsEnabled;
/** /**
* HRegion constructor. This constructor should only be used for testing and * HRegion constructor. This constructor should only be used for testing and
@ -693,6 +696,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG); HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
configurationManager = Optional.absent(); configurationManager = Optional.absent();
// disable stats tracking system tables, but check the config for everything else
this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
false :
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
} }
void setHTableSpecificConf() { void setHTableSpecificConf() {
@ -5295,18 +5305,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return results; return results;
} }
public void mutateRow(RowMutations rm) throws IOException { public ClientProtos.RegionLoadStats mutateRow(RowMutations rm) throws IOException {
// Don't need nonces here - RowMutations only supports puts and deletes // Don't need nonces here - RowMutations only supports puts and deletes
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); return mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
} }
/** /**
* Perform atomic mutations within the region w/o nonces. * Perform atomic mutations within the region w/o nonces.
* See {@link #mutateRowsWithLocks(Collection, Collection, long, long)} * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
*/ */
public void mutateRowsWithLocks(Collection<Mutation> mutations, public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock) throws IOException { Collection<byte[]> rowsToLock) throws IOException {
mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); return mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
} }
/** /**
@ -5321,10 +5331,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* <code>rowsToLock</code> is sorted in order to avoid deadlocks. * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
* @throws IOException * @throws IOException
*/ */
public void mutateRowsWithLocks(Collection<Mutation> mutations, public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException { Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
processRowsWithLocks(proc, -1, nonceGroup, nonce); processRowsWithLocks(proc, -1, nonceGroup, nonce);
return getRegionStats();
}
/**
* @return the current load statistics for the the region
*/
public ClientProtos.RegionLoadStats getRegionStats() {
if (!regionStatsEnabled) {
return null;
}
ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
.memstoreFlushSize)));
return stats.build();
} }
/** /**

View File

@ -258,8 +258,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
private static ResultOrException getResultOrException( private static ResultOrException getResultOrException(
final ClientProtos.Result r, final int index) { final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
return getResultOrException(ResponseConverter.buildActionResult(r), index); return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
} }
private static ResultOrException getResultOrException(final Exception e, final int index) { private static ResultOrException getResultOrException(final Exception e, final int index) {
@ -356,7 +356,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param cellScanner if non-null, the mutation data -- the Cell content. * @param cellScanner if non-null, the mutation data -- the Cell content.
* @throws IOException * @throws IOException
*/ */
private void mutateRows(final HRegion region, final List<ClientProtos.Action> actions, private ClientProtos.RegionLoadStats mutateRows(final HRegion region,
final List<ClientProtos.Action> actions,
final CellScanner cellScanner) throws IOException { final CellScanner cellScanner) throws IOException {
if (!region.getRegionInfo().isMetaTable()) { if (!region.getRegionInfo().isMetaTable()) {
regionServer.cacheFlusher.reclaimMemStoreMemory(); regionServer.cacheFlusher.reclaimMemStoreMemory();
@ -382,7 +383,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
} }
} }
region.mutateRow(rm); return region.mutateRow(rm);
} }
/** /**
@ -657,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
case SUCCESS: case SUCCESS:
builder.addResultOrException(getResultOrException( builder.addResultOrException(getResultOrException(
ClientProtos.Result.getDefaultInstance(), index)); ClientProtos.Result.getDefaultInstance(), index, region.getRegionStats()));
break; break;
} }
} }
@ -1815,7 +1816,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
processed = checkAndRowMutate(region, regionAction.getActionList(), processed = checkAndRowMutate(region, regionAction.getActionList(),
cellScanner, row, family, qualifier, compareOp, comparator); cellScanner, row, family, qualifier, compareOp, comparator);
} else { } else {
mutateRows(region, regionAction.getActionList(), cellScanner); ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
cellScanner);
// add the stats to the request
if(stats != null) {
responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
.addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
}
processed = Boolean.TRUE; processed = Boolean.TRUE;
} }
} catch (IOException e) { } catch (IOException e) {

View File

@ -162,7 +162,7 @@ public class WALEditsReplaySink {
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<Entry> entries) throws IOException { final List<Entry> entries) throws IOException {
try { try {
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf); RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
ReplayServerCallable<ReplicateWALEntryResponse> callable = ReplayServerCallable<ReplicateWALEntryResponse> callable =
new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc, new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
regionInfo, entries); regionInfo, entries);

View File

@ -154,7 +154,7 @@ public class HConnectionTestingUtility {
Mockito.doNothing().when(c).decCount(); Mockito.doNothing().when(c).decCount();
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf, RpcRetryingCallerFactory.instantiate(conf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR)); RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
HTableInterface t = Mockito.mock(HTableInterface.class); HTableInterface t = Mockito.mock(HTableInterface.class);
Mockito.when(c.getTable((TableName)Mockito.any())).thenReturn(t); Mockito.when(c.getTable((TableName)Mockito.any())).thenReturn(t);
ResultScanner rs = Mockito.mock(ResultScanner.class); ResultScanner rs = Mockito.mock(ResultScanner.class);

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Test that we can actually send and use region metrics to slowdown client writes
*/
@Category(MediumTests.class)
public class TestClientPushback {
private static final Log LOG = LogFactory.getLog(TestClientPushback.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] tableName = Bytes.toBytes("client-pushback");
private static final byte[] family = Bytes.toBytes("f");
private static final byte[] qualifier = Bytes.toBytes("q");
private static long flushSizeBytes = 1024;
@BeforeClass
public static void setupCluster() throws Exception{
Configuration conf = UTIL.getConfiguration();
// enable backpressure
conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
// use the exponential backoff policy
conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class,
ClientBackoffPolicy.class);
// turn the memstore size way down so we don't need to write a lot to see changes in memstore
// load
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
// ensure we block the flushes when we are double that flushsize
conf.setLong("hbase.hregion.memstore.block.multiplier", 2);
UTIL.startMiniCluster();
UTIL.createTable(tableName, family);
}
@AfterClass
public static void teardownCluster() throws Exception{
UTIL.shutdownMiniCluster();
}
@Test(timeout=60000)
public void testClientTracksServerPushback() throws Exception{
Configuration conf = UTIL.getConfiguration();
TableName tablename = TableName.valueOf(tableName);
Connection conn = ConnectionFactory.createConnection(conf);
HTable table = (HTable) conn.getTable(tablename);
//make sure we flush after each put
table.setAutoFlushTo(true);
// write some data
Put p = new Put(Bytes.toBytes("row"));
p.add(family, qualifier, Bytes.toBytes("value1"));
table.put(p);
// get the stats for the region hosting our table
ClusterConnection connection = table.connection;
ClientBackoffPolicy backoffPolicy = connection.getBackoffPolicy();
assertTrue("Backoff policy is not correctly configured",
backoffPolicy instanceof ExponentialClientBackoffPolicy);
ServerStatisticTracker stats = connection.getStatisticsTracker();
assertNotNull( "No stats configured for the client!", stats);
HRegion region = UTIL.getHBaseCluster().getRegionServer(0).getOnlineRegions(tablename).get(0);
// get the names so we can query the stats
ServerName server = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
byte[] regionName = region.getRegionName();
// check to see we found some load on the memstore
ServerStatistics serverStats = stats.getServerStatsForTesting(server);
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
int load = regionStats.getMemstoreLoadPercent();
if (load < 11) {
assertEquals("Load on memstore too low", 11, load);
}
// check that the load reported produces a nonzero delay
long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
assertNotEquals("Reported load does not produce a backoff", backoffTime, 0);
LOG.debug("Backoff calculated for " + region.getRegionNameAsString() + " @ " + server +
" is " + backoffTime);
// Reach into the connection and submit work directly to AsyncProcess so we can
// monitor how long the submission was delayed via a callback
List<Row> ops = new ArrayList<Row>(1);
ops.add(p);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
table.ap.submit(tablename, ops, true, new Batch.Callback<Result>() {
@Override
public void update(byte[] region, byte[] row, Result result) {
endTime.set(EnvironmentEdgeManager.currentTime());
latch.countDown();
}
}, true);
// Currently the ExponentialClientBackoffPolicy under these test conditions
// produces a backoffTime of 151 milliseconds. This is long enough so the
// wait and related checks below are reasonable. Revisit if the backoff
// time reported by above debug logging has significantly deviated.
latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0);
assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
}
}