From a411227b0ebf78b4ee8ae7179e162b54734e77de Mon Sep 17 00:00:00 2001 From: Jesse Yates Date: Tue, 28 Oct 2014 16:14:16 -0700 Subject: [PATCH] HBASE-5162 Basic client pushback mechanism Instead of just blocking the client for 90 seconds when the region gets too busy, it now sends along region load stats to the client so the client can know how busy the server is. Currently, its just the load on the memstore, but it can be extended for other stats (e.g. cpu, general memory, etc.). It is then up to the client to decide if it wants to listen to these stats. By default, the client ignores the stats, but it can easily be toggled to the built-in exponential back-off or users can plug in their own back-off implementations --- .../hadoop/hbase/client/AsyncProcess.java | 120 ++- .../hbase/client/ClusterConnection.java | 12 +- .../hbase/client/ConnectionAdapter.java | 11 + .../hbase/client/ConnectionManager.java | 26 +- .../hadoop/hbase/client/DelayingRunner.java | 116 +++ .../apache/hadoop/hbase/client/HTable.java | 5 +- .../hadoop/hbase/client/MultiAction.java | 17 +- .../apache/hadoop/hbase/client/Result.java | 20 +- .../hadoop/hbase/client/ResultStatsUtil.java | 76 ++ .../hbase/client/RpcRetryingCaller.java | 217 +---- .../client/RpcRetryingCallerFactory.java | 42 +- .../hbase/client/RpcRetryingCallerImpl.java | 238 +++++ .../hbase/client/ServerStatisticTracker.java | 74 ++ .../StatsTrackingRpcRetryingCaller.java | 77 ++ .../client/backoff/ClientBackoffPolicy.java | 42 + .../backoff/ClientBackoffPolicyFactory.java | 59 ++ .../ExponentialClientBackoffPolicy.java | 71 ++ .../client/backoff/ServerStatistics.java | 68 ++ .../ipc/RegionCoprocessorRpcChannel.java | 2 +- .../hbase/protobuf/ResponseConverter.java | 16 +- .../hadoop/hbase/client/TestAsyncProcess.java | 6 +- .../client/TestClientExponentialBackoff.java | 110 +++ .../client/TestFastFailWithoutTestUtil.java | 2 +- .../org/apache/hadoop/hbase/HConstants.java | 6 + .../protobuf/generated/ClientProtos.java | 810 +++++++++++++++++- hbase-protocol/src/main/protobuf/Client.proto | 10 + .../mapreduce/LoadIncrementalHFiles.java | 3 +- .../hadoop/hbase/regionserver/HRegion.java | 34 +- .../hbase/regionserver/RSRpcServices.java | 18 +- .../regionserver/wal/WALEditsReplaySink.java | 2 +- .../client/HConnectionTestingUtility.java | 2 +- .../hbase/client/TestClientPushback.java | 94 ++ .../hbase/client/TestReplicasClient.java | 2 +- .../TestRegionReplicaReplicationEndpoint.java | 3 +- 34 files changed, 2110 insertions(+), 301 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index e0c14a6460d..8b1db8f881d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; @@ -313,7 +315,8 @@ class AsyncProcess { * Uses default ExecutorService for this AP (must have been created with one). */ public AsyncRequestFuture submit(TableName tableName, List rows, - boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { + boolean atLeastOne, Batch.Callback callback, boolean needResults) + throws InterruptedIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } @@ -374,7 +377,7 @@ class AsyncProcess { locationErrors = new ArrayList(); locationErrorRows = new ArrayList(); LOG.error("Failed to get region location ", ex); - // This action failed before creating ars. Add it to retained but do not add to submit list. + // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. retainedActions.add(new Action(r, ++posInList)); locationErrors.add(ex); @@ -918,14 +921,12 @@ class AsyncProcess { return loc; } - - /** * Send a multi action structure to the servers, after a delay depending on the attempt * number. Asynchronous. * * @param actionsByServer the actions structured by regions - * @param numAttempt the attempt number. + * @param numAttempt the attempt number. * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ private void sendMultiAction(Map> actionsByServer, @@ -935,33 +936,98 @@ class AsyncProcess { int actionsRemaining = actionsByServer.size(); // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry> e : actionsByServer.entrySet()) { - final ServerName server = e.getKey(); - final MultiAction multiAction = e.getValue(); + ServerName server = e.getKey(); + MultiAction multiAction = e.getValue(); incTaskCounters(multiAction.getRegions(), server); - Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server)); - if ((--actionsRemaining == 0) && reuseThread) { - runnable.run(); - } else { - try { - pool.submit(runnable); - } catch (RejectedExecutionException ree) { - // This should never happen. But as the pool is provided by the end user, let's secure - // this a little. - decTaskCounters(multiAction.getRegions(), server); - LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + - " Server is " + server.getServerName(), ree); - // We're likely to fail again, but this will increment the attempt counter, so it will - // finish. - receiveGlobalFailure(multiAction, server, numAttempt, ree); + Collection runnables = getNewMultiActionRunnable(server, multiAction, + numAttempt); + // make sure we correctly count the number of runnables before we try to reuse the send + // thread, in case we had to split the request into different runnables because of backoff + if (runnables.size() > actionsRemaining) { + actionsRemaining = runnables.size(); + } + + // run all the runnables + for (Runnable runnable : runnables) { + if ((--actionsRemaining == 0) && reuseThread) { + runnable.run(); + } else { + try { + pool.submit(runnable); + } catch (RejectedExecutionException ree) { + // This should never happen. But as the pool is provided by the end user, let's secure + // this a little. + decTaskCounters(multiAction.getRegions(), server); + LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + server.getServerName(), ree); + // We're likely to fail again, but this will increment the attempt counter, so it will + // finish. + receiveGlobalFailure(multiAction, server, numAttempt, ree); + } } } } + if (actionsForReplicaThread != null) { startWaitingForReplicaCalls(actionsForReplicaThread); } } + private Collection getNewMultiActionRunnable(ServerName server, + MultiAction multiAction, + int numAttempt) { + // no stats to manage, just do the standard action + if (AsyncProcess.this.connection.getStatisticsTracker() == null) { + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(multiAction, numAttempt, server))); + } + + // group the actions by the amount of delay + Map actions = new HashMap(multiAction + .size()); + + // split up the actions + for (Map.Entry>> e : multiAction.actions.entrySet()) { + Long backoff = getBackoff(server, e.getKey()); + DelayingRunner runner = actions.get(backoff); + if (runner == null) { + actions.put(backoff, new DelayingRunner(backoff, e)); + } else { + runner.add(e); + } + } + + List toReturn = new ArrayList(actions.size()); + for (DelayingRunner runner : actions.values()) { + String traceText = "AsyncProcess.sendMultiAction"; + Runnable runnable = + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server); + // use a delay runner only if we need to sleep for some time + if (runner.getSleepTime() > 0) { + runner.setRunner(runnable); + traceText = "AsyncProcess.clientBackoff.sendMultiAction"; + runnable = runner; + } + runnable = Trace.wrap(traceText, runnable); + toReturn.add(runnable); + + } + return toReturn; + } + + /** + * @param server server location where the target region is hosted + * @param regionName name of the region which we are going to write some data + * @return the amount of time the client should wait until it submit a request to the + * specified server and region + */ + private Long getBackoff(ServerName server, byte[] regionName) { + ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker(); + ServerStatistics stats = tracker.getStats(server); + return AsyncProcess.this.connection.getBackoffPolicy() + .getBackoffTime(server, regionName, stats); + } + /** * Starts waiting to issue replica calls on a different thread; or issues them immediately. */ @@ -1169,6 +1235,13 @@ class AsyncProcess { ++failed; } } else { + // update the stats about the region, if its a user table. We don't want to slow down + // updates to meta tables, especially from internal updates (master, etc). + if (AsyncProcess.this.connection.getStatisticsTracker() != null) { + result = ResultStatsUtil.updateStats(result, + AsyncProcess.this.connection.getStatisticsTracker(), server, regionName); + } + if (callback != null) { try { //noinspection unchecked @@ -1498,7 +1571,6 @@ class AsyncProcess { } } - @VisibleForTesting /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected AsyncRequestFutureImpl createAsyncRequestFuture( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 8989725729e..45b99ebe071 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; @@ -288,5 +289,14 @@ public interface ClusterConnection extends HConnection { * @return true if this is a managed connection. */ boolean isManaged(); -} + /** + * @return the current statistics tracker associated with this connection + */ + ServerStatisticTracker getStatisticsTracker(); + + /** + * @return the configured client backoff policy + */ + ClientBackoffPolicy getBackoffPolicy(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 001132835be..53c1271c265 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -447,4 +448,14 @@ abstract class ConnectionAdapter implements ClusterConnection { public boolean isManaged() { return wrappedConnection.isManaged(); } + + @Override + public ServerStatisticTracker getStatisticsTracker() { + return wrappedConnection.getStatisticsTracker(); + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return wrappedConnection.getBackoffPolicy(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index f813ebd2f81..acb64c8d80b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -541,6 +543,8 @@ class ConnectionManager { final int rpcTimeout; private NonceGenerator nonceGenerator = null; private final AsyncProcess asyncProcess; + // single tracker per connection + private final ServerStatisticTracker stats; private volatile boolean closed; private volatile boolean aborted; @@ -596,6 +600,8 @@ class ConnectionManager { */ Registry registry; + private final ClientBackoffPolicy backoffPolicy; + HConnectionImplementation(Configuration conf, boolean managed) throws IOException { this(conf, managed, null, null); } @@ -670,9 +676,11 @@ class ConnectionManager { } else { this.nonceGenerator = new NoNonceGenerator(); } + stats = ServerStatisticTracker.create(conf); this.asyncProcess = createAsyncProcess(this.conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); } @Override @@ -2207,7 +2215,8 @@ class ConnectionManager { protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. return new AsyncProcess(this, conf, this.batchPool, - RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf)); + RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false, + RpcControllerFactory.instantiate(conf)); } @Override @@ -2215,6 +2224,16 @@ class ConnectionManager { return asyncProcess; } + @Override + public ServerStatisticTracker getStatisticsTracker() { + return this.stats; + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return this.backoffPolicy; + } + /* * Return the number of cached region for a table. It will only be called * from a unit test. @@ -2506,7 +2525,8 @@ class ConnectionManager { @Override public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { - return RpcRetryingCallerFactory.instantiate(conf, this.interceptor); + return RpcRetryingCallerFactory + .instantiate(conf, this.interceptor, this.getStatisticsTracker()); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java new file mode 100644 index 00000000000..83c73b6a36a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.util.List; +import java.util.Map; + +/** + * A wrapper for a runnable for a group of actions for a single regionserver. + *

+ * This can be used to build up the actions that should be taken and then + *

+ *

+ * This class exists to simulate using a ScheduledExecutorService with just a regular + * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could + * only be removed if we change the expectations in HTable around the pool the client is able to + * pass in and even if we deprecate the current APIs would require keeping this class around + * for the interim to bridge between the legacy ExecutorServices and the scheduled pool. + *

+ */ +@InterfaceAudience.Private +public class DelayingRunner implements Runnable { + private static final Log LOG = LogFactory.getLog(DelayingRunner.class); + + private final Object sleepLock = new Object(); + private boolean triggerWake = false; + private long sleepTime; + private MultiAction actions = new MultiAction(); + private Runnable runnable; + + public DelayingRunner(long sleepTime, Map.Entry>> e) { + this.sleepTime = sleepTime; + add(e); + } + + public void setRunner(Runnable runner) { + this.runnable = runner; + } + + @Override + public void run() { + if (!sleep()) { + LOG.warn( + "Interrupted while sleeping for expected sleep time " + sleepTime + " ms"); + } + //TODO maybe we should consider switching to a listenableFuture for the actual callable and + // then handling the results/errors as callbacks. That way we can decrement outstanding tasks + // even if we get interrupted here, but for now, we still need to run so we decrement the + // outstanding tasks + this.runnable.run(); + } + + /** + * Sleep for an expected amount of time. + *

+ * This is nearly a copy of what the Sleeper does, but with the ability to know if you + * got interrupted while sleeping. + *

+ * + * @return true if the sleep completely entirely successfully, + * but otherwise false if the sleep was interrupted. + */ + private boolean sleep() { + long now = EnvironmentEdgeManager.currentTime(); + long startTime = now; + long waitTime = sleepTime; + while (waitTime > 0) { + long woke = -1; + try { + synchronized (sleepLock) { + if (triggerWake) break; + sleepLock.wait(waitTime); + } + woke = EnvironmentEdgeManager.currentTime(); + } catch (InterruptedException iex) { + return false; + } + // Recalculate waitTime. + woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke; + waitTime = waitTime - (woke - startTime); + } + return true; + } + + public void add(Map.Entry>> e) { + actions.add(e.getKey(), e.getValue()); + } + + public MultiAction getActions() { + return actions; + } + + public long getSleepTime() { + return sleepTime; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fd0470af21d..0508fce5e4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1887,8 +1887,9 @@ public class HTable implements HTableInterface, RegionLocator { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, - RpcRetryingCallerFactory.instantiate(configuration), true, - RpcControllerFactory.instantiate(configuration)); + RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), + true, RpcControllerFactory.instantiate(configuration)); + AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback() { @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index 8f1dc4d0e1b..16ab8524cdd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,12 +69,24 @@ public final class MultiAction { * @param a */ public void add(byte[] regionName, Action a) { + add(regionName, Arrays.asList(a)); + } + + /** + * Add an Action to this container based on it's regionName. If the regionName + * is wrong, the initial execution will fail, but will be automatically + * retried after looking up the correct region. + * + * @param regionName + * @param actionList list of actions to add for the region + */ + public void add(byte[] regionName, List> actionList){ List> rsActions = actions.get(regionName); if (rsActions == null) { - rsActions = new ArrayList>(); + rsActions = new ArrayList>(actionList.size()); actions.put(regionName, rsActions); } - rsActions.add(a); + rsActions.addAll(actionList); } public void setNonceGroup(long nonceGroup) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 8303121cc36..08d9b801627 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -94,6 +95,7 @@ public class Result implements CellScannable, CellScanner { * Index for where we are when Result is acting as a {@link CellScanner}. */ private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX; + private ClientProtos.RegionLoadStats stats; /** * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. @@ -794,4 +796,20 @@ public class Result implements CellScannable, CellScanner { public boolean isStale() { return stale; } -} + + /** + * Add load information about the region to the information about the result + * @param loadStats statistics about the current region from which this was returned + */ + public void addResults(ClientProtos.RegionLoadStats loadStats) { + this.stats = loadStats; + } + + /** + * @return the associated statistics about the region from which this was returned. Can be + * null if stats are disabled. + */ + public ClientProtos.RegionLoadStats getStats() { + return stats; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java new file mode 100644 index 00000000000..3caa63e4a35 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * A {@link Result} with some statistics about the server/region status + */ +@InterfaceAudience.Private +public final class ResultStatsUtil { + + private ResultStatsUtil() { + //private ctor for util class + } + + /** + * Update the stats for the specified region if the result is an instance of {@link + * ResultStatsUtil} + * + * @param r object that contains the result and possibly the statistics about the region + * @param serverStats stats tracker to update from the result + * @param server server from which the result was obtained + * @param regionName full region name for the stats. + * @return the underlying {@link Result} if the passed result is an {@link + * ResultStatsUtil} or just returns the result; + */ + public static T updateStats(T r, ServerStatisticTracker serverStats, + ServerName server, byte[] regionName) { + if (!(r instanceof Result)) { + return r; + } + Result result = (Result) r; + // early exit if there are no stats to collect + ClientProtos.RegionLoadStats stats = result.getStats(); + if(stats == null){ + return r; + } + + if (regionName != null) { + serverStats.updateRegionStats(server, regionName, stats); + } + + return r; + } + + public static T updateStats(T r, ServerStatisticTracker stats, + HRegionLocation regionLocation) { + byte[] regionName = null; + ServerName server = null; + if (regionLocation != null) { + server = regionLocation.getServerName(); + regionName = regionLocation.getRegionInfo().getRegionName(); + } + + return updateStats(r, stats, server, regionName); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index a2c4d991d7f..807c227f12c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,93 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.classification.InterfaceStability; -import com.google.protobuf.ServiceException; +import java.io.IOException; /** - * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client - * threadlocal outstanding timeouts as so we don't persist too much. - * Dynamic rather than static so can set the generic appropriately. * - * This object has a state. It should not be used by in parallel by different threads. - * Reusing it is possible however, even between multiple threads. However, the user will - * have to manage the synchronization on its side: there is no synchronization inside the class. */ -@InterfaceAudience.Private -public class RpcRetryingCaller { - public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); - /** - * When we started making calls. - */ - private long globalStartTime; - /** - * Start and end times for a single call. - */ - private final static int MIN_RPC_TIMEOUT = 2000; - /** How many retries are allowed before we start to log */ - private final int startLogErrorsCnt; - - private final long pause; - private final int retries; - private final AtomicBoolean cancelled = new AtomicBoolean(false); - private final RetryingCallerInterceptor interceptor; - private final RetryingCallerInterceptorContext context; - - public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) { - this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); - } - - public RpcRetryingCaller(long pause, int retries, - RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { - this.pause = pause; - this.retries = retries; - this.interceptor = interceptor; - context = interceptor.createEmptyContext(); - this.startLogErrorsCnt = startLogErrorsCnt; - } - - private int getRemainingTime(int callTimeout) { - if (callTimeout <= 0) { - return 0; - } else { - if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; - int remainingTime = (int) (callTimeout - - (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); - if (remainingTime < MIN_RPC_TIMEOUT) { - // If there is no time left, we're trying anyway. It's too late. - // 0 means no timeout, and it's not the intent here. So we secure both cases by - // resetting to the minimum. - remainingTime = MIN_RPC_TIMEOUT; - } - return remainingTime; - } - } - - public void cancel(){ - cancelled.set(true); - synchronized (cancelled){ - cancelled.notifyAll(); - } - } +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RpcRetryingCaller { + void cancel(); /** * Retries if invocation fails. @@ -112,75 +38,8 @@ public class RpcRetryingCaller { * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ - public T callWithRetries(RetryingCallable callable, int callTimeout) - throws IOException, RuntimeException { - List exceptions = - new ArrayList(); - this.globalStartTime = EnvironmentEdgeManager.currentTime(); - context.clear(); - for (int tries = 0;; tries++) { - long expectedSleep; - try { - callable.prepare(tries != 0); // if called with false, check table status on ZK - interceptor.intercept(context.prepare(callable, tries)); - return callable.call(getRemainingTime(callTimeout)); - } catch (PreemptiveFastFailException e) { - throw e; - } catch (Throwable t) { - ExceptionUtil.rethrowIfInterrupt(t); - if (tries > startLogErrorsCnt) { - LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + - (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " - + "cancelled=" + cancelled.get() + ", msg=" - + callable.getExceptionMessageAdditionalDetail()); - } - - // translateException throws exception when should not retry: i.e. when request is bad. - interceptor.handleFailure(context, t); - t = translateException(t); - callable.throwable(t, retries != 1); - RetriesExhaustedException.ThrowableWithExtraContext qt = - new RetriesExhaustedException.ThrowableWithExtraContext(t, - EnvironmentEdgeManager.currentTime(), toString()); - exceptions.add(qt); - if (tries >= retries - 1) { - throw new RetriesExhaustedException(tries, exceptions); - } - // If the server is dead, we need to wait a little before retrying, to give - // a chance to the regions to be - // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time - expectedSleep = callable.sleep(pause, tries + 1); - - // If, after the planned sleep, there won't be enough time left, we stop now. - long duration = singleCallDuration(expectedSleep); - if (duration > callTimeout) { - String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + - ": " + callable.getExceptionMessageAdditionalDetail(); - throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); - } - } finally { - interceptor.updateFailureInfo(context); - } - try { - if (expectedSleep > 0) { - synchronized (cancelled) { - if (cancelled.get()) return null; - cancelled.wait(expectedSleep); - } - } - if (cancelled.get()) return null; - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); - } - } - } - - /** - * @return Calculate how long a single call took - */ - private long singleCallDuration(final long expectedSleep) { - return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep; - } + T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException; /** * Call the server once only. @@ -191,62 +50,6 @@ public class RpcRetryingCaller { * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ - public T callWithoutRetries(RetryingCallable callable, int callTimeout) - throws IOException, RuntimeException { - // The code of this method should be shared with withRetries. - this.globalStartTime = EnvironmentEdgeManager.currentTime(); - try { - callable.prepare(false); - return callable.call(callTimeout); - } catch (Throwable t) { - Throwable t2 = translateException(t); - ExceptionUtil.rethrowIfInterrupt(t2); - // It would be nice to clear the location cache here. - if (t2 instanceof IOException) { - throw (IOException)t2; - } else { - throw new RuntimeException(t2); - } - } - } - - /** - * Get the good or the remote exception if any, throws the DoNotRetryIOException. - * @param t the throwable to analyze - * @return the translated exception, if it's not a DoNotRetryIOException - * @throws DoNotRetryIOException - if we find it, we throw it instead of translating. - */ - static Throwable translateException(Throwable t) throws DoNotRetryIOException { - if (t instanceof UndeclaredThrowableException) { - if (t.getCause() != null) { - t = t.getCause(); - } - } - if (t instanceof RemoteException) { - t = ((RemoteException)t).unwrapRemoteException(); - } - if (t instanceof LinkageError) { - throw new DoNotRetryIOException(t); - } - if (t instanceof ServiceException) { - ServiceException se = (ServiceException)t; - Throwable cause = se.getCause(); - if (cause != null && cause instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException)cause; - } - // Don't let ServiceException out; its rpc specific. - t = cause; - // t could be a RemoteException so go aaround again. - translateException(t); - } else if (t instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException)t; - } - return t; - } - - @Override - public String toString() { - return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + - ", pause=" + pause + ", retries=" + retries + '}'; - } + T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 9f059977c2b..6f2760fcfa1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -35,6 +35,8 @@ public class RpcRetryingCallerFactory { private final int retries; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + private final boolean enableBackPressure; + private ServerStatisticTracker stats; public RpcRetryingCallerFactory(Configuration conf) { this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); @@ -49,27 +51,53 @@ public class RpcRetryingCallerFactory { startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; + enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + } + + /** + * Set the tracker that should be used for tracking statistics about the server + */ + public void setStatisticTracker(ServerStatisticTracker statisticTracker) { + this.stats = statisticTracker; } public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - return new RpcRetryingCaller(pause, retries, interceptor, startLogErrorsCnt); + RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, + startLogErrorsCnt); + + // wrap it with stats, if we are tracking them + if (enableBackPressure && this.stats != null) { + caller = new StatsTrackingRpcRetryingCaller(caller, this.stats); + } + + return caller; } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } - + public static RpcRetryingCallerFactory instantiate(Configuration configuration, - RetryingCallerInterceptor interceptor) { + ServerStatisticTracker stats) { + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); if (rpcCallerFactoryClazz.equals(clazzName)) { return new RpcRetryingCallerFactory(configuration, interceptor); } - return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, - new Class[] { Configuration.class }, new Object[] { configuration }); + RpcRetryingCallerFactory factory = ReflectionUtils.instantiateWithCustomCtor( + rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); + + // setting for backwards compat with existing caller factories, rather than in the ctor + factory.setStatisticTracker(stats); + return factory; } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java new file mode 100644 index 00000000000..1d037bc7b9b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -0,0 +1,238 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.ipc.RemoteException; + +import com.google.protobuf.ServiceException; + +/** + * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client + * threadlocal outstanding timeouts as so we don't persist too much. + * Dynamic rather than static so can set the generic appropriately. + * + * This object has a state. It should not be used by in parallel by different threads. + * Reusing it is possible however, even between multiple threads. However, the user will + * have to manage the synchronization on its side: there is no synchronization inside the class. + */ +@InterfaceAudience.Private +public class RpcRetryingCallerImpl implements RpcRetryingCaller { + public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class); + /** + * When we started making calls. + */ + private long globalStartTime; + /** + * Start and end times for a single call. + */ + private final static int MIN_RPC_TIMEOUT = 2000; + /** How many retries are allowed before we start to log */ + private final int startLogErrorsCnt; + + private final long pause; + private final int retries; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final RetryingCallerInterceptor interceptor; + private final RetryingCallerInterceptorContext context; + + public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { + this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); + } + + public RpcRetryingCallerImpl(long pause, int retries, + RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { + this.pause = pause; + this.retries = retries; + this.interceptor = interceptor; + context = interceptor.createEmptyContext(); + this.startLogErrorsCnt = startLogErrorsCnt; + } + + private int getRemainingTime(int callTimeout) { + if (callTimeout <= 0) { + return 0; + } else { + if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; + int remainingTime = (int) (callTimeout - + (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); + if (remainingTime < MIN_RPC_TIMEOUT) { + // If there is no time left, we're trying anyway. It's too late. + // 0 means no timeout, and it's not the intent here. So we secure both cases by + // resetting to the minimum. + remainingTime = MIN_RPC_TIMEOUT; + } + return remainingTime; + } + } + + @Override + public void cancel(){ + cancelled.set(true); + synchronized (cancelled){ + cancelled.notifyAll(); + } + } + + @Override + public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + List exceptions = + new ArrayList(); + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + context.clear(); + for (int tries = 0;; tries++) { + long expectedSleep; + try { + callable.prepare(tries != 0); // if called with false, check table status on ZK + interceptor.intercept(context.prepare(callable, tries)); + return callable.call(getRemainingTime(callTimeout)); + } catch (PreemptiveFastFailException e) { + throw e; + } catch (Throwable t) { + ExceptionUtil.rethrowIfInterrupt(t); + if (tries > startLogErrorsCnt) { + LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + + (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " + + "cancelled=" + cancelled.get() + ", msg=" + + callable.getExceptionMessageAdditionalDetail()); + } + + // translateException throws exception when should not retry: i.e. when request is bad. + interceptor.handleFailure(context, t); + t = translateException(t); + callable.throwable(t, retries != 1); + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(t, + EnvironmentEdgeManager.currentTime(), toString()); + exceptions.add(qt); + if (tries >= retries - 1) { + throw new RetriesExhaustedException(tries, exceptions); + } + // If the server is dead, we need to wait a little before retrying, to give + // a chance to the regions to be + // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + expectedSleep = callable.sleep(pause, tries + 1); + + // If, after the planned sleep, there won't be enough time left, we stop now. + long duration = singleCallDuration(expectedSleep); + if (duration > callTimeout) { + String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + + ": " + callable.getExceptionMessageAdditionalDetail(); + throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); + } + } finally { + interceptor.updateFailureInfo(context); + } + try { + if (expectedSleep > 0) { + synchronized (cancelled) { + if (cancelled.get()) return null; + cancelled.wait(expectedSleep); + } + } + if (cancelled.get()) return null; + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); + } + } + } + + /** + * @return Calculate how long a single call took + */ + private long singleCallDuration(final long expectedSleep) { + return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep; + } + + @Override + public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + // The code of this method should be shared with withRetries. + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + try { + callable.prepare(false); + return callable.call(callTimeout); + } catch (Throwable t) { + Throwable t2 = translateException(t); + ExceptionUtil.rethrowIfInterrupt(t2); + // It would be nice to clear the location cache here. + if (t2 instanceof IOException) { + throw (IOException)t2; + } else { + throw new RuntimeException(t2); + } + } + } + + /** + * Get the good or the remote exception if any, throws the DoNotRetryIOException. + * @param t the throwable to analyze + * @return the translated exception, if it's not a DoNotRetryIOException + * @throws DoNotRetryIOException - if we find it, we throw it instead of translating. + */ + static Throwable translateException(Throwable t) throws DoNotRetryIOException { + if (t instanceof UndeclaredThrowableException) { + if (t.getCause() != null) { + t = t.getCause(); + } + } + if (t instanceof RemoteException) { + t = ((RemoteException)t).unwrapRemoteException(); + } + if (t instanceof LinkageError) { + throw new DoNotRetryIOException(t); + } + if (t instanceof ServiceException) { + ServiceException se = (ServiceException)t; + Throwable cause = se.getCause(); + if (cause != null && cause instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)cause; + } + // Don't let ServiceException out; its rpc specific. + t = cause; + // t could be a RemoteException so go aaround again. + translateException(t); + } else if (t instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)t; + } + return t; + } + + @Override + public String toString() { + return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + + ", pause=" + pause + ", retries=" + retries + '}'; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java new file mode 100644 index 00000000000..0c7b683d79a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks the statistics for multiple regions + */ +@InterfaceAudience.Private +public class ServerStatisticTracker { + + private final Map stats = + new ConcurrentHashMap(); + + public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats + currentStats) { + ServerStatistics stat = stats.get(server); + + if (stat == null) { + // create a stats object and update the stats + synchronized (this) { + stat = stats.get(server); + // we don't have stats for that server yet, so we need to make some + if (stat == null) { + stat = new ServerStatistics(); + stats.put(server, stat); + } + } + } + stat.update(region, currentStats); + } + + public ServerStatistics getStats(ServerName server) { + return this.stats.get(server); + } + + public static ServerStatisticTracker create(Configuration conf) { + if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) { + return null; + } + return new ServerStatisticTracker(); + } + + @VisibleForTesting + ServerStatistics getServerStatsForTesting(ServerName server) { + return stats.get(server); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java new file mode 100644 index 00000000000..cec0ee59dec --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return, + * if stats are available + */ +@InterfaceAudience.Private +public class StatsTrackingRpcRetryingCaller implements RpcRetryingCaller { + private final ServerStatisticTracker stats; + private final RpcRetryingCaller delegate; + + public StatsTrackingRpcRetryingCaller(RpcRetryingCaller delegate, + ServerStatisticTracker stats) { + this.delegate = delegate; + this.stats = stats; + } + + @Override + public void cancel() { + delegate.cancel(); + } + + @Override + public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + T result = delegate.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + @Override + public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + T result = delegate.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + private T updateStatsAndUnwrap(T result, RetryingCallable callable) { + // don't track stats about requests that aren't to regionservers + if (!(callable instanceof RegionServerCallable)) { + return result; + } + + // mutli-server callables span multiple regions, so they don't have a location, + // but they are region server callables, so we have to handle them when we process the + // result, not in here + if (callable instanceof MultiServerCallable) { + return result; + } + + // update the stats for the single server callable + RegionServerCallable regionCallable = (RegionServerCallable) callable; + HRegionLocation location = regionCallable.getLocation(); + return ResultStatsUtil.updateStats(result, stats, location); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java new file mode 100644 index 00000000000..94e434f1c97 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Configurable policy for the amount of time a client should wait for a new request to the + * server when given the server load statistics. + *

+ * Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration} + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ClientBackoffPolicy { + + public static final String BACKOFF_POLICY_CLASS = + "hbase.client.statistics.backoff-policy"; + + /** + * @return the number of ms to wait on the client based on the + */ + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java new file mode 100644 index 00000000000..879a0e22c0d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ClientBackoffPolicyFactory { + + private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class); + + private ClientBackoffPolicyFactory() { + } + + public static ClientBackoffPolicy create(Configuration conf) { + // create the backoff policy + String className = + conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class + .getName()); + return ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class }, new Object[] { conf }); + } + + /** + * Default backoff policy that doesn't create any backoff for the client, regardless of load + */ + public static class NoBackoffPolicy implements ClientBackoffPolicy { + public NoBackoffPolicy(Configuration conf){ + // necessary to meet contract + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + return 0; + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java new file mode 100644 index 00000000000..6e75670227e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple exponential backoff policy on for the client that uses a percent^4 times the + * max backoff to generate the backoff time. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { + + private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class); + + private static final long ONE_MINUTE = 60 * 1000; + public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE; + public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max"; + private long maxBackoff; + + public ExponentialClientBackoffPolicy(Configuration conf) { + this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF); + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + // no stats for the server yet, so don't backoff + if (stats == null) { + return 0; + } + + ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region); + // no stats for the region yet - don't backoff + if (regionStats == null) { + return 0; + } + + // square the percent as a value less than 1. Closer we move to 100 percent, + // the percent moves to 1, but squaring causes the exponential curve + double percent = regionStats.getMemstoreLoadPercent() / 100.0; + double multiplier = Math.pow(percent, 4.0); + // shouldn't ever happen, but just incase something changes in the statistic data + if (multiplier > 1) { + LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " + + "down to the max backoff"); + multiplier = 1; + } + return (long) (multiplier * maxBackoff); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java new file mode 100644 index 00000000000..a3b8e11a632 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Track the statistics for a single region + */ +@InterfaceAudience.Private +public class ServerStatistics { + + private Map + stats = new TreeMap(Bytes.BYTES_COMPARATOR); + + /** + * Good enough attempt. Last writer wins. It doesn't really matter which one gets to update, + * as something gets set + * @param region + * @param currentStats + */ + public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) { + RegionStatistics regionStat = this.stats.get(region); + if(regionStat == null){ + regionStat = new RegionStatistics(); + this.stats.put(region, regionStat); + } + + regionStat.update(currentStats); + } + + @InterfaceAudience.Private + public RegionStatistics getStatsForRegion(byte[] regionName){ + return stats.get(regionName); + } + + public static class RegionStatistics{ + private int memstoreLoad = 0; + + public void update(ClientProtos.RegionLoadStats currentStats) { + this.memstoreLoad = currentStats.getMemstoreLoad(); + } + + public int getMemstoreLoadPercent(){ + return this.memstoreLoad; + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 8c75f4fa3f0..8433ceebbab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -61,7 +61,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ this.connection = conn; this.table = table; this.row = row; - this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration()); + this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null); this.operationTimeout = conn.getConfiguration().getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 70da40c0389..1d42a828275 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -114,17 +114,23 @@ public final class ResponseConverter { } for (ResultOrException roe : actionResult.getResultOrExceptionList()) { + Object responseValue; if (roe.hasException()) { - results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException())); + responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { - results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells)); + responseValue = ProtobufUtil.toResult(roe.getResult(), cells); + // add the load stats, if we got any + if (roe.hasLoadStats()) { + ((Result) responseValue).addResults(roe.getLoadStats()); + } } else if (roe.hasServiceResult()) { - results.add(regionName, roe.getIndex(), roe.getServiceResult()); + responseValue = roe.getServiceResult(); } else { // no result & no exception. Unexpected. throw new IllegalStateException("No result & no exception roe=" + roe + " for region " + actions.getRegion()); } + results.add(regionName, roe.getIndex(), responseValue); } } @@ -149,9 +155,11 @@ public final class ResponseConverter { * @param r * @return an action result builder */ - public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) { + public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r, + ClientProtos.RegionLoadStats stats) { ResultOrException.Builder builder = ResultOrException.newBuilder(); if (r != null) builder.setResult(r); + if(stats != null) builder.setLoadStats(stats); return builder; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d2196382f94..88a95fbfd50 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -190,7 +190,7 @@ public class TestAsyncProcess { } }); - return new RpcRetryingCaller(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -208,7 +208,7 @@ public class TestAsyncProcess { } } - static class CallerWithFailure extends RpcRetryingCaller{ + static class CallerWithFailure extends RpcRetryingCallerImpl{ public CallerWithFailure() { super(100, 100, 9); @@ -294,7 +294,7 @@ public class TestAsyncProcess { replicaCalls.incrementAndGet(); } - return new RpcRetryingCaller(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java new file mode 100644 index 00000000000..88e409d5baf --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestClientExponentialBackoff { + + ServerName server = Mockito.mock(ServerName.class); + byte[] regionname = Bytes.toBytes("region"); + + @Test + public void testNulls() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + assertEquals(0, backoff.getBackoffTime(null, null, null)); + + // server name doesn't matter to calculation, but check it now anyways + assertEquals(0, backoff.getBackoffTime(server, null, null)); + assertEquals(0, backoff.getBackoffTime(server, regionname, null)); + + // check when no stats for the region yet + ServerStatistics stats = new ServerStatistics(); + assertEquals(0, backoff.getBackoffTime(server, regionname, stats)); + } + + @Test + public void testMaxLoad() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + update(stats, 100); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + + // another policy with a different max timeout + long max = 100; + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max); + ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // test beyond 100 still doesn't exceed the max + update(stats, 101); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // and that when we are below 100, its less than the max timeout + update(stats, 99); + assertTrue(backoff.getBackoffTime(server, + regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); + assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max); + } + + /** + * Make sure that we get results in the order that we expect - backoff for a load of 1 should + * less than backoff for 10, which should be less than that for 50. + */ + @Test + public void testResultOrdering() { + Configuration conf = new Configuration(false); + // make the max timeout really high so we get differentiation between load factors + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + long previous = backoff.getBackoffTime(server, regionname, stats); + for (int i = 1; i <= 100; i++) { + update(stats, i); + long next = backoff.getBackoffTime(server, regionname, stats); + assertTrue( + "Previous backoff time" + previous + " >= " + next + ", the next backoff time for " + + "load " + i, previous < next); + previous = next; + } + } + + private void update(ServerStatistics stats, int load) { + ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() + .setMemstoreLoad + (load).build(); + stats.update(regionname, stat); + } +} \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java index 080cd8b5929..e82e59d3b45 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java @@ -564,7 +564,7 @@ public class TestFastFailWithoutTestUtil { public RpcRetryingCaller getRpcRetryingCaller(int pauseTime, int retries, RetryingCallerInterceptor interceptor) { - return new RpcRetryingCaller(pauseTime, retries, interceptor, 9) { + return new RpcRetryingCallerImpl(pauseTime, retries, interceptor, 9) { @Override public Void callWithRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 60017670861..33b71ad01c8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1099,6 +1099,12 @@ public final class HConstants { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + /** Config key for if the server should send backpressure and if the client should listen to + * that backpressure from the server */ + public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; + public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false; + + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index c36662ecdad..ab86e1e269c 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -26210,6 +26210,482 @@ public final class ClientProtos { // @@protoc_insertion_point(class_scope:RegionAction) } + public interface RegionLoadStatsOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional int32 memstoreLoad = 1 [default = 0]; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + boolean hasMemstoreLoad(); + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + int getMemstoreLoad(); + } + /** + * Protobuf type {@code RegionLoadStats} + * + *
+   *
+   * Statistics about the current load on the region
+   * 
+ */ + public static final class RegionLoadStats extends + com.google.protobuf.GeneratedMessage + implements RegionLoadStatsOrBuilder { + // Use RegionLoadStats.newBuilder() to construct. + private RegionLoadStats(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private RegionLoadStats(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final RegionLoadStats defaultInstance; + public static RegionLoadStats getDefaultInstance() { + return defaultInstance; + } + + public RegionLoadStats getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private RegionLoadStats( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + memstoreLoad_ = input.readInt32(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public RegionLoadStats parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new RegionLoadStats(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional int32 memstoreLoad = 1 [default = 0]; + public static final int MEMSTORELOAD_FIELD_NUMBER = 1; + private int memstoreLoad_; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public boolean hasMemstoreLoad() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public int getMemstoreLoad() { + return memstoreLoad_; + } + + private void initFields() { + memstoreLoad_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt32(1, memstoreLoad_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, memstoreLoad_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) obj; + + boolean result = true; + result = result && (hasMemstoreLoad() == other.hasMemstoreLoad()); + if (hasMemstoreLoad()) { + result = result && (getMemstoreLoad() + == other.getMemstoreLoad()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasMemstoreLoad()) { + hash = (37 * hash) + MEMSTORELOAD_FIELD_NUMBER; + hash = (53 * hash) + getMemstoreLoad(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code RegionLoadStats} + * + *
+     *
+     * Statistics about the current load on the region
+     * 
+ */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + memstoreLoad_ = 0; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats build() { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.memstoreLoad_ = memstoreLoad_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) return this; + if (other.hasMemstoreLoad()) { + setMemstoreLoad(other.getMemstoreLoad()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional int32 memstoreLoad = 1 [default = 0]; + private int memstoreLoad_ ; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public boolean hasMemstoreLoad() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public int getMemstoreLoad() { + return memstoreLoad_; + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder setMemstoreLoad(int value) { + bitField0_ |= 0x00000001; + memstoreLoad_ = value; + onChanged(); + return this; + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder clearMemstoreLoad() { + bitField0_ = (bitField0_ & ~0x00000001); + memstoreLoad_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RegionLoadStats) + } + + static { + defaultInstance = new RegionLoadStats(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RegionLoadStats) + } + public interface ResultOrExceptionOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -26286,6 +26762,32 @@ public final class ClientProtos { * */ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResultOrBuilder getServiceResultOrBuilder(); + + // optional .RegionLoadStats loadStats = 5; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + boolean hasLoadStats(); + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats(); + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder(); } /** * Protobuf type {@code ResultOrException} @@ -26389,6 +26891,19 @@ public final class ClientProtos { bitField0_ |= 0x00000008; break; } + case 42: { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder subBuilder = null; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + subBuilder = loadStats_.toBuilder(); + } + loadStats_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(loadStats_); + loadStats_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000010; + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -26533,11 +27048,46 @@ public final class ClientProtos { return serviceResult_; } + // optional .RegionLoadStats loadStats = 5; + public static final int LOADSTATS_FIELD_NUMBER = 5; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public boolean hasLoadStats() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() { + return loadStats_; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() { + return loadStats_; + } + private void initFields() { index_ = 0; result_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.getDefaultInstance(); exception_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair.getDefaultInstance(); serviceResult_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResult.getDefaultInstance(); + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -26575,6 +27125,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeMessage(4, serviceResult_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeMessage(5, loadStats_); + } getUnknownFields().writeTo(output); } @@ -26600,6 +27153,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(4, serviceResult_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(5, loadStats_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -26643,6 +27200,11 @@ public final class ClientProtos { result = result && getServiceResult() .equals(other.getServiceResult()); } + result = result && (hasLoadStats() == other.hasLoadStats()); + if (hasLoadStats()) { + result = result && getLoadStats() + .equals(other.getLoadStats()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -26672,6 +27234,10 @@ public final class ClientProtos { hash = (37 * hash) + SERVICE_RESULT_FIELD_NUMBER; hash = (53 * hash) + getServiceResult().hashCode(); } + if (hasLoadStats()) { + hash = (37 * hash) + LOADSTATS_FIELD_NUMBER; + hash = (53 * hash) + getLoadStats().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -26783,6 +27349,7 @@ public final class ClientProtos { getResultFieldBuilder(); getExceptionFieldBuilder(); getServiceResultFieldBuilder(); + getLoadStatsFieldBuilder(); } } private static Builder create() { @@ -26811,6 +27378,12 @@ public final class ClientProtos { serviceResultBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000008); + if (loadStatsBuilder_ == null) { + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + } else { + loadStatsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -26867,6 +27440,14 @@ public final class ClientProtos { } else { result.serviceResult_ = serviceResultBuilder_.build(); } + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + if (loadStatsBuilder_ == null) { + result.loadStats_ = loadStats_; + } else { + result.loadStats_ = loadStatsBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -26895,6 +27476,9 @@ public final class ClientProtos { if (other.hasServiceResult()) { mergeServiceResult(other.getServiceResult()); } + if (other.hasLoadStats()) { + mergeLoadStats(other.getLoadStats()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -27374,6 +27958,159 @@ public final class ClientProtos { return serviceResultBuilder_; } + // optional .RegionLoadStats loadStats = 5; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> loadStatsBuilder_; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public boolean hasLoadStats() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() { + if (loadStatsBuilder_ == null) { + return loadStats_; + } else { + return loadStatsBuilder_.getMessage(); + } + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder setLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) { + if (loadStatsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + loadStats_ = value; + onChanged(); + } else { + loadStatsBuilder_.setMessage(value); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder setLoadStats( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder builderForValue) { + if (loadStatsBuilder_ == null) { + loadStats_ = builderForValue.build(); + onChanged(); + } else { + loadStatsBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder mergeLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) { + if (loadStatsBuilder_ == null) { + if (((bitField0_ & 0x00000010) == 0x00000010) && + loadStats_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) { + loadStats_ = + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder(loadStats_).mergeFrom(value).buildPartial(); + } else { + loadStats_ = value; + } + onChanged(); + } else { + loadStatsBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder clearLoadStats() { + if (loadStatsBuilder_ == null) { + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + onChanged(); + } else { + loadStatsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder getLoadStatsBuilder() { + bitField0_ |= 0x00000010; + onChanged(); + return getLoadStatsFieldBuilder().getBuilder(); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() { + if (loadStatsBuilder_ != null) { + return loadStatsBuilder_.getMessageOrBuilder(); + } else { + return loadStats_; + } + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> + getLoadStatsFieldBuilder() { + if (loadStatsBuilder_ == null) { + loadStatsBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder>( + loadStats_, + getParentForChildren(), + isClean()); + loadStats_ = null; + } + return loadStatsBuilder_; + } + // @@protoc_insertion_point(builder_scope:ResultOrException) } @@ -31066,6 +31803,11 @@ public final class ClientProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_RegionAction_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_RegionLoadStats_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RegionLoadStats_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_ResultOrException_descriptor; private static @@ -31180,31 +31922,33 @@ public final class ClientProtos { "\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Cop" + "rocessorServiceCall\"Y\n\014RegionAction\022 \n\006r" + "egion\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030" + - "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"\221\001\n\021Resul" + - "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + - "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + - "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + - "essorServiceResult\"f\n\022RegionActionResult", - "\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrEx" + - "ception\022!\n\texception\030\002 \001(\0132\016.NameBytesPa" + - "ir\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" + - "2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tc" + - "ondition\030\003 \001(\0132\n.Condition\"S\n\rMultiRespo" + - "nse\022/\n\022regionActionResult\030\001 \003(\0132\023.Region" + - "ActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consi" + - "stency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCli" + - "entService\022 \n\003Get\022\013.GetRequest\032\014.GetResp" + - "onse\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateR", - "esponse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResp" + - "onse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReq" + - "uest\032\026.BulkLoadHFileResponse\022F\n\013ExecServ" + - "ice\022\032.CoprocessorServiceRequest\032\033.Coproc" + - "essorServiceResponse\022R\n\027ExecRegionServer" + - "Service\022\032.CoprocessorServiceRequest\032\033.Co" + - "processorServiceResponse\022&\n\005Multi\022\r.Mult" + - "iRequest\032\016.MultiResponseBB\n*org.apache.h" + - "adoop.hbase.protobuf.generatedB\014ClientPr" + - "otosH\001\210\001\001\240\001\001" + "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"*\n\017Region" + + "LoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\"\266\001\n\021R" + + "esultOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006resul" + + "t\030\002 \001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.Na" + + "meBytesPair\0221\n\016service_result\030\004 \001(\0132\031.Co", + "processorServiceResult\022#\n\tloadStats\030\005 \001(" + + "\0132\020.RegionLoadStats\"f\n\022RegionActionResul" + + "t\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrE" + + "xception\022!\n\texception\030\002 \001(\0132\016.NameBytesP" + + "air\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(" + + "\0132\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\t" + + "condition\030\003 \001(\0132\n.Condition\"S\n\rMultiResp" + + "onse\022/\n\022regionActionResult\030\001 \003(\0132\023.Regio" + + "nActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Cons" + + "istency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCl", + "ientService\022 \n\003Get\022\013.GetRequest\032\014.GetRes" + + "ponse\022)\n\006Mutate\022\016.MutateRequest\032\017.Mutate" + + "Response\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRes" + + "ponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRe" + + "quest\032\026.BulkLoadHFileResponse\022F\n\013ExecSer" + + "vice\022\032.CoprocessorServiceRequest\032\033.Copro" + + "cessorServiceResponse\022R\n\027ExecRegionServe" + + "rService\022\032.CoprocessorServiceRequest\032\033.C" + + "oprocessorServiceResponse\022&\n\005Multi\022\r.Mul" + + "tiRequest\032\016.MultiResponseBB\n*org.apache.", + "hadoop.hbase.protobuf.generatedB\014ClientP" + + "rotosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -31361,26 +32105,32 @@ public final class ClientProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionAction_descriptor, new java.lang.String[] { "Region", "Atomic", "Action", }); - internal_static_ResultOrException_descriptor = + internal_static_RegionLoadStats_descriptor = getDescriptor().getMessageTypes().get(22); + internal_static_RegionLoadStats_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RegionLoadStats_descriptor, + new java.lang.String[] { "MemstoreLoad", }); + internal_static_ResultOrException_descriptor = + getDescriptor().getMessageTypes().get(23); internal_static_ResultOrException_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ResultOrException_descriptor, - new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", }); + new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", "LoadStats", }); internal_static_RegionActionResult_descriptor = - getDescriptor().getMessageTypes().get(23); + getDescriptor().getMessageTypes().get(24); internal_static_RegionActionResult_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionActionResult_descriptor, new java.lang.String[] { "ResultOrException", "Exception", }); internal_static_MultiRequest_descriptor = - getDescriptor().getMessageTypes().get(24); + getDescriptor().getMessageTypes().get(25); internal_static_MultiRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiRequest_descriptor, new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", }); internal_static_MultiResponse_descriptor = - getDescriptor().getMessageTypes().get(25); + getDescriptor().getMessageTypes().get(26); internal_static_MultiResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiResponse_descriptor, diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index ede1c26e6a3..1a3c43e4a0f 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -353,6 +353,14 @@ message RegionAction { repeated Action action = 3; } +/* +* Statistics about the current load on the region +*/ +message RegionLoadStats{ + // percent load on the memstore. Guaranteed to be positive, between 0 and 100 + optional int32 memstoreLoad = 1 [default = 0]; +} + /** * Either a Result or an Exception NameBytesPair (keyed by * exception name whose value is the exception stringified) @@ -366,6 +374,8 @@ message ResultOrException { optional NameBytesPair exception = 3; // result if this was a coprocessor service call optional CoprocessorServiceResult service_result = 4; + // current load on the region + optional RegionLoadStats loadStats = 5; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b4b6adce1cc..31be7ad1112 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -690,7 +690,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { List toRetry = new ArrayList(); Configuration conf = getConf(); - boolean success = RpcRetryingCallerFactory.instantiate(conf). newCaller() + boolean success = RpcRetryingCallerFactory.instantiate(conf, + null). newCaller() .callWithRetries(svrCallable, Integer.MAX_VALUE); if (!success) { LOG.warn("Attempt to bulk load region containing " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 95ba337d97b..428e8575a9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; @@ -122,6 +123,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -544,6 +546,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; private final Durability durability; + private final boolean regionStatsEnabled; /** * HRegion constructor. This constructor should only be used for testing and @@ -685,6 +688,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG); configurationManager = Optional.absent(); + + // disable stats tracking system tables, but check the config for everything else + this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ? + false : + conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); } void setHTableSpecificConf() { @@ -5187,18 +5197,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return results; } - public void mutateRow(RowMutations rm) throws IOException { + public ClientProtos.RegionLoadStats mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes - mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + return mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); } /** * Perform atomic mutations within the region w/o nonces. * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)} */ - public void mutateRowsWithLocks(Collection mutations, + public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection mutations, Collection rowsToLock) throws IOException { - mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); + return mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); } /** @@ -5213,10 +5223,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ - public void mutateRowsWithLocks(Collection mutations, + public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); processRowsWithLocks(proc, -1, nonceGroup, nonce); + return getRegionStats(); + } + + /** + * @return the current load statistics for the the region + */ + public ClientProtos.RegionLoadStats getRegionStats() { + if (!regionStatsEnabled) { + return null; + } + ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); + stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this + .memstoreFlushSize))); + return stats.build(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 97c698733ba..32d59d41870 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -257,8 +257,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private static ResultOrException getResultOrException( - final ClientProtos.Result r, final int index) { - return getResultOrException(ResponseConverter.buildActionResult(r), index); + final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) { + return getResultOrException(ResponseConverter.buildActionResult(r, stats), index); } private static ResultOrException getResultOrException(final Exception e, final int index) { @@ -355,7 +355,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. * @throws IOException */ - private void mutateRows(final HRegion region, final List actions, + private ClientProtos.RegionLoadStats mutateRows(final HRegion region, + final List actions, final CellScanner cellScanner) throws IOException { if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); @@ -381,7 +382,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } - region.mutateRow(rm); + return region.mutateRow(rm); } /** @@ -661,7 +662,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, case SUCCESS: builder.addResultOrException(getResultOrException( - ClientProtos.Result.getDefaultInstance(), index)); + ClientProtos.Result.getDefaultInstance(), index, region.getRegionStats())); break; } } @@ -1815,6 +1816,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, processed = checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, qualifier, compareOp, comparator); } else { + ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(), + cellScanner); + // add the stats to the request + if(stats != null) { + responseBuilder.addRegionActionResult(RegionActionResult.newBuilder() + .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats))); + } mutateRows(region, regionAction.getActionList(), cellScanner); processed = Boolean.TRUE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index ff5f2f53ecd..59a1b4395cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -162,7 +162,7 @@ public class WALEditsReplaySink { private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List entries) throws IOException { try { - RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf); + RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null); ReplayServerCallable callable = new ReplayServerCallable(this.conn, this.tableName, regionLoc, regionInfo, entries); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index a99b0476b12..998cdf0e49f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -134,7 +134,7 @@ public class HConnectionTestingUtility { Mockito.doNothing().when(c).decCount(); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, - RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR)); + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); HTableInterface t = Mockito.mock(HTableInterface.class); Mockito.when(c.getTable((TableName)Mockito.any())).thenReturn(t); ResultScanner rs = Mockito.mock(ResultScanner.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java new file mode 100644 index 00000000000..dfb9a7078ea --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test that we can actually send and use region metrics to slowdown client writes + */ +@Category(MediumTests.class) +public class TestClientPushback { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final byte[] tableName = Bytes.toBytes("client-pushback"); + private static final byte[] family = Bytes.toBytes("f"); + private static final byte[] qualifier = Bytes.toBytes("q"); + private static long flushSizeBytes = 1024; + + @BeforeClass + public static void setupCluster() throws Exception{ + Configuration conf = UTIL.getConfiguration(); + // enable backpressure + conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); + // turn the memstore size way down so we don't need to write a lot to see changes in memstore + // load + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + flushSizeBytes); + // ensure we block the flushes when we are double that flushsize + conf.setLong("hbase.hregion.memstore.block.multiplier", 2); + + UTIL.startMiniCluster(); + UTIL.createTable(tableName, family); + } + + @AfterClass + public static void teardownCluster() throws Exception{ + UTIL.shutdownMiniCluster(); + } + + @Test + public void testClientTrackesServerPushback() throws Exception{ + Configuration conf = UTIL.getConfiguration(); + TableName tablename = TableName.valueOf(tableName); + HTable table = new HTable(conf, tablename); + //make sure we flush after each put + table.setAutoFlushTo(true); + + // write some data + Put p = new Put(Bytes.toBytes("row")); + p.add(family, qualifier, Bytes.toBytes("value1")); + table.put(p); + + // get the stats for the region hosting our table + ClusterConnection conn = ConnectionManager.getConnectionInternal(conf); + ServerStatisticTracker stats = conn.getStatisticsTracker(); + assertNotNull( "No stats configured for the client!", stats); + // get the names so we can query the stats + ServerName server = UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + byte[] regionName = UTIL.getHBaseCluster().getRegionServer(0).getOnlineRegions(tablename).get + (0).getRegionName(); + + // check to see we found some load on the memstore + ServerStatistics serverStats = stats.getServerStatsForTesting(server); + ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); + assertEquals(15, regionStats.getMemstoreLoadPercent()); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 837e37bb2b3..3d1b1c80832 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -76,7 +76,7 @@ public class TestReplicasClient { private static final Log LOG = LogFactory.getLog(TestReplicasClient.class); static { - ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL); } private static final int NB_SERVERS = 1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index f5380efb43f..7ca12f03761 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.RpcRetryingCallerImpl; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -74,7 +75,7 @@ public class TestRegionReplicaReplicationEndpoint { private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class); static { - ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL); } private static final int NB_SERVERS = 2;