diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index e0c14a6460d..8b1db8f881d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; @@ -313,7 +315,8 @@ class AsyncProcess { * Uses default ExecutorService for this AP (must have been created with one). */ public AsyncRequestFuture submit(TableName tableName, List rows, - boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { + boolean atLeastOne, Batch.Callback callback, boolean needResults) + throws InterruptedIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } @@ -374,7 +377,7 @@ class AsyncProcess { locationErrors = new ArrayList(); locationErrorRows = new ArrayList(); LOG.error("Failed to get region location ", ex); - // This action failed before creating ars. Add it to retained but do not add to submit list. + // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. retainedActions.add(new Action(r, ++posInList)); locationErrors.add(ex); @@ -918,14 +921,12 @@ class AsyncProcess { return loc; } - - /** * Send a multi action structure to the servers, after a delay depending on the attempt * number. Asynchronous. * * @param actionsByServer the actions structured by regions - * @param numAttempt the attempt number. + * @param numAttempt the attempt number. * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ private void sendMultiAction(Map> actionsByServer, @@ -935,33 +936,98 @@ class AsyncProcess { int actionsRemaining = actionsByServer.size(); // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry> e : actionsByServer.entrySet()) { - final ServerName server = e.getKey(); - final MultiAction multiAction = e.getValue(); + ServerName server = e.getKey(); + MultiAction multiAction = e.getValue(); incTaskCounters(multiAction.getRegions(), server); - Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server)); - if ((--actionsRemaining == 0) && reuseThread) { - runnable.run(); - } else { - try { - pool.submit(runnable); - } catch (RejectedExecutionException ree) { - // This should never happen. But as the pool is provided by the end user, let's secure - // this a little. - decTaskCounters(multiAction.getRegions(), server); - LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + - " Server is " + server.getServerName(), ree); - // We're likely to fail again, but this will increment the attempt counter, so it will - // finish. - receiveGlobalFailure(multiAction, server, numAttempt, ree); + Collection runnables = getNewMultiActionRunnable(server, multiAction, + numAttempt); + // make sure we correctly count the number of runnables before we try to reuse the send + // thread, in case we had to split the request into different runnables because of backoff + if (runnables.size() > actionsRemaining) { + actionsRemaining = runnables.size(); + } + + // run all the runnables + for (Runnable runnable : runnables) { + if ((--actionsRemaining == 0) && reuseThread) { + runnable.run(); + } else { + try { + pool.submit(runnable); + } catch (RejectedExecutionException ree) { + // This should never happen. But as the pool is provided by the end user, let's secure + // this a little. + decTaskCounters(multiAction.getRegions(), server); + LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + server.getServerName(), ree); + // We're likely to fail again, but this will increment the attempt counter, so it will + // finish. + receiveGlobalFailure(multiAction, server, numAttempt, ree); + } } } } + if (actionsForReplicaThread != null) { startWaitingForReplicaCalls(actionsForReplicaThread); } } + private Collection getNewMultiActionRunnable(ServerName server, + MultiAction multiAction, + int numAttempt) { + // no stats to manage, just do the standard action + if (AsyncProcess.this.connection.getStatisticsTracker() == null) { + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(multiAction, numAttempt, server))); + } + + // group the actions by the amount of delay + Map actions = new HashMap(multiAction + .size()); + + // split up the actions + for (Map.Entry>> e : multiAction.actions.entrySet()) { + Long backoff = getBackoff(server, e.getKey()); + DelayingRunner runner = actions.get(backoff); + if (runner == null) { + actions.put(backoff, new DelayingRunner(backoff, e)); + } else { + runner.add(e); + } + } + + List toReturn = new ArrayList(actions.size()); + for (DelayingRunner runner : actions.values()) { + String traceText = "AsyncProcess.sendMultiAction"; + Runnable runnable = + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server); + // use a delay runner only if we need to sleep for some time + if (runner.getSleepTime() > 0) { + runner.setRunner(runnable); + traceText = "AsyncProcess.clientBackoff.sendMultiAction"; + runnable = runner; + } + runnable = Trace.wrap(traceText, runnable); + toReturn.add(runnable); + + } + return toReturn; + } + + /** + * @param server server location where the target region is hosted + * @param regionName name of the region which we are going to write some data + * @return the amount of time the client should wait until it submit a request to the + * specified server and region + */ + private Long getBackoff(ServerName server, byte[] regionName) { + ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker(); + ServerStatistics stats = tracker.getStats(server); + return AsyncProcess.this.connection.getBackoffPolicy() + .getBackoffTime(server, regionName, stats); + } + /** * Starts waiting to issue replica calls on a different thread; or issues them immediately. */ @@ -1169,6 +1235,13 @@ class AsyncProcess { ++failed; } } else { + // update the stats about the region, if its a user table. We don't want to slow down + // updates to meta tables, especially from internal updates (master, etc). + if (AsyncProcess.this.connection.getStatisticsTracker() != null) { + result = ResultStatsUtil.updateStats(result, + AsyncProcess.this.connection.getStatisticsTracker(), server, regionName); + } + if (callback != null) { try { //noinspection unchecked @@ -1498,7 +1571,6 @@ class AsyncProcess { } } - @VisibleForTesting /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected AsyncRequestFutureImpl createAsyncRequestFuture( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 8989725729e..45b99ebe071 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; @@ -288,5 +289,14 @@ public interface ClusterConnection extends HConnection { * @return true if this is a managed connection. */ boolean isManaged(); -} + /** + * @return the current statistics tracker associated with this connection + */ + ServerStatisticTracker getStatisticsTracker(); + + /** + * @return the configured client backoff policy + */ + ClientBackoffPolicy getBackoffPolicy(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 001132835be..53c1271c265 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -447,4 +448,14 @@ abstract class ConnectionAdapter implements ClusterConnection { public boolean isManaged() { return wrappedConnection.isManaged(); } + + @Override + public ServerStatisticTracker getStatisticsTracker() { + return wrappedConnection.getStatisticsTracker(); + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return wrappedConnection.getBackoffPolicy(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index f813ebd2f81..acb64c8d80b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -541,6 +543,8 @@ class ConnectionManager { final int rpcTimeout; private NonceGenerator nonceGenerator = null; private final AsyncProcess asyncProcess; + // single tracker per connection + private final ServerStatisticTracker stats; private volatile boolean closed; private volatile boolean aborted; @@ -596,6 +600,8 @@ class ConnectionManager { */ Registry registry; + private final ClientBackoffPolicy backoffPolicy; + HConnectionImplementation(Configuration conf, boolean managed) throws IOException { this(conf, managed, null, null); } @@ -670,9 +676,11 @@ class ConnectionManager { } else { this.nonceGenerator = new NoNonceGenerator(); } + stats = ServerStatisticTracker.create(conf); this.asyncProcess = createAsyncProcess(this.conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); } @Override @@ -2207,7 +2215,8 @@ class ConnectionManager { protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. return new AsyncProcess(this, conf, this.batchPool, - RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf)); + RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false, + RpcControllerFactory.instantiate(conf)); } @Override @@ -2215,6 +2224,16 @@ class ConnectionManager { return asyncProcess; } + @Override + public ServerStatisticTracker getStatisticsTracker() { + return this.stats; + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return this.backoffPolicy; + } + /* * Return the number of cached region for a table. It will only be called * from a unit test. @@ -2506,7 +2525,8 @@ class ConnectionManager { @Override public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { - return RpcRetryingCallerFactory.instantiate(conf, this.interceptor); + return RpcRetryingCallerFactory + .instantiate(conf, this.interceptor, this.getStatisticsTracker()); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java new file mode 100644 index 00000000000..83c73b6a36a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.util.List; +import java.util.Map; + +/** + * A wrapper for a runnable for a group of actions for a single regionserver. + *

+ * This can be used to build up the actions that should be taken and then + *

+ *

+ * This class exists to simulate using a ScheduledExecutorService with just a regular + * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could + * only be removed if we change the expectations in HTable around the pool the client is able to + * pass in and even if we deprecate the current APIs would require keeping this class around + * for the interim to bridge between the legacy ExecutorServices and the scheduled pool. + *

+ */ +@InterfaceAudience.Private +public class DelayingRunner implements Runnable { + private static final Log LOG = LogFactory.getLog(DelayingRunner.class); + + private final Object sleepLock = new Object(); + private boolean triggerWake = false; + private long sleepTime; + private MultiAction actions = new MultiAction(); + private Runnable runnable; + + public DelayingRunner(long sleepTime, Map.Entry>> e) { + this.sleepTime = sleepTime; + add(e); + } + + public void setRunner(Runnable runner) { + this.runnable = runner; + } + + @Override + public void run() { + if (!sleep()) { + LOG.warn( + "Interrupted while sleeping for expected sleep time " + sleepTime + " ms"); + } + //TODO maybe we should consider switching to a listenableFuture for the actual callable and + // then handling the results/errors as callbacks. That way we can decrement outstanding tasks + // even if we get interrupted here, but for now, we still need to run so we decrement the + // outstanding tasks + this.runnable.run(); + } + + /** + * Sleep for an expected amount of time. + *

+ * This is nearly a copy of what the Sleeper does, but with the ability to know if you + * got interrupted while sleeping. + *

+ * + * @return true if the sleep completely entirely successfully, + * but otherwise false if the sleep was interrupted. + */ + private boolean sleep() { + long now = EnvironmentEdgeManager.currentTime(); + long startTime = now; + long waitTime = sleepTime; + while (waitTime > 0) { + long woke = -1; + try { + synchronized (sleepLock) { + if (triggerWake) break; + sleepLock.wait(waitTime); + } + woke = EnvironmentEdgeManager.currentTime(); + } catch (InterruptedException iex) { + return false; + } + // Recalculate waitTime. + woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke; + waitTime = waitTime - (woke - startTime); + } + return true; + } + + public void add(Map.Entry>> e) { + actions.add(e.getKey(), e.getValue()); + } + + public MultiAction getActions() { + return actions; + } + + public long getSleepTime() { + return sleepTime; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fd0470af21d..0508fce5e4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1887,8 +1887,9 @@ public class HTable implements HTableInterface, RegionLocator { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, - RpcRetryingCallerFactory.instantiate(configuration), true, - RpcControllerFactory.instantiate(configuration)); + RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), + true, RpcControllerFactory.instantiate(configuration)); + AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback() { @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index 8f1dc4d0e1b..16ab8524cdd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,12 +69,24 @@ public final class MultiAction { * @param a */ public void add(byte[] regionName, Action a) { + add(regionName, Arrays.asList(a)); + } + + /** + * Add an Action to this container based on it's regionName. If the regionName + * is wrong, the initial execution will fail, but will be automatically + * retried after looking up the correct region. + * + * @param regionName + * @param actionList list of actions to add for the region + */ + public void add(byte[] regionName, List> actionList){ List> rsActions = actions.get(regionName); if (rsActions == null) { - rsActions = new ArrayList>(); + rsActions = new ArrayList>(actionList.size()); actions.put(regionName, rsActions); } - rsActions.add(a); + rsActions.addAll(actionList); } public void setNonceGroup(long nonceGroup) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 8303121cc36..08d9b801627 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -94,6 +95,7 @@ public class Result implements CellScannable, CellScanner { * Index for where we are when Result is acting as a {@link CellScanner}. */ private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX; + private ClientProtos.RegionLoadStats stats; /** * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. @@ -794,4 +796,20 @@ public class Result implements CellScannable, CellScanner { public boolean isStale() { return stale; } -} + + /** + * Add load information about the region to the information about the result + * @param loadStats statistics about the current region from which this was returned + */ + public void addResults(ClientProtos.RegionLoadStats loadStats) { + this.stats = loadStats; + } + + /** + * @return the associated statistics about the region from which this was returned. Can be + * null if stats are disabled. + */ + public ClientProtos.RegionLoadStats getStats() { + return stats; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java new file mode 100644 index 00000000000..3caa63e4a35 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * A {@link Result} with some statistics about the server/region status + */ +@InterfaceAudience.Private +public final class ResultStatsUtil { + + private ResultStatsUtil() { + //private ctor for util class + } + + /** + * Update the stats for the specified region if the result is an instance of {@link + * ResultStatsUtil} + * + * @param r object that contains the result and possibly the statistics about the region + * @param serverStats stats tracker to update from the result + * @param server server from which the result was obtained + * @param regionName full region name for the stats. + * @return the underlying {@link Result} if the passed result is an {@link + * ResultStatsUtil} or just returns the result; + */ + public static T updateStats(T r, ServerStatisticTracker serverStats, + ServerName server, byte[] regionName) { + if (!(r instanceof Result)) { + return r; + } + Result result = (Result) r; + // early exit if there are no stats to collect + ClientProtos.RegionLoadStats stats = result.getStats(); + if(stats == null){ + return r; + } + + if (regionName != null) { + serverStats.updateRegionStats(server, regionName, stats); + } + + return r; + } + + public static T updateStats(T r, ServerStatisticTracker stats, + HRegionLocation regionLocation) { + byte[] regionName = null; + ServerName server = null; + if (regionLocation != null) { + server = regionLocation.getServerName(); + regionName = regionLocation.getRegionInfo().getRegionName(); + } + + return updateStats(r, stats, server, regionName); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index a2c4d991d7f..807c227f12c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,93 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.classification.InterfaceStability; -import com.google.protobuf.ServiceException; +import java.io.IOException; /** - * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client - * threadlocal outstanding timeouts as so we don't persist too much. - * Dynamic rather than static so can set the generic appropriately. * - * This object has a state. It should not be used by in parallel by different threads. - * Reusing it is possible however, even between multiple threads. However, the user will - * have to manage the synchronization on its side: there is no synchronization inside the class. */ -@InterfaceAudience.Private -public class RpcRetryingCaller { - public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); - /** - * When we started making calls. - */ - private long globalStartTime; - /** - * Start and end times for a single call. - */ - private final static int MIN_RPC_TIMEOUT = 2000; - /** How many retries are allowed before we start to log */ - private final int startLogErrorsCnt; - - private final long pause; - private final int retries; - private final AtomicBoolean cancelled = new AtomicBoolean(false); - private final RetryingCallerInterceptor interceptor; - private final RetryingCallerInterceptorContext context; - - public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) { - this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); - } - - public RpcRetryingCaller(long pause, int retries, - RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { - this.pause = pause; - this.retries = retries; - this.interceptor = interceptor; - context = interceptor.createEmptyContext(); - this.startLogErrorsCnt = startLogErrorsCnt; - } - - private int getRemainingTime(int callTimeout) { - if (callTimeout <= 0) { - return 0; - } else { - if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; - int remainingTime = (int) (callTimeout - - (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); - if (remainingTime < MIN_RPC_TIMEOUT) { - // If there is no time left, we're trying anyway. It's too late. - // 0 means no timeout, and it's not the intent here. So we secure both cases by - // resetting to the minimum. - remainingTime = MIN_RPC_TIMEOUT; - } - return remainingTime; - } - } - - public void cancel(){ - cancelled.set(true); - synchronized (cancelled){ - cancelled.notifyAll(); - } - } +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RpcRetryingCaller { + void cancel(); /** * Retries if invocation fails. @@ -112,75 +38,8 @@ public class RpcRetryingCaller { * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ - public T callWithRetries(RetryingCallable callable, int callTimeout) - throws IOException, RuntimeException { - List exceptions = - new ArrayList(); - this.globalStartTime = EnvironmentEdgeManager.currentTime(); - context.clear(); - for (int tries = 0;; tries++) { - long expectedSleep; - try { - callable.prepare(tries != 0); // if called with false, check table status on ZK - interceptor.intercept(context.prepare(callable, tries)); - return callable.call(getRemainingTime(callTimeout)); - } catch (PreemptiveFastFailException e) { - throw e; - } catch (Throwable t) { - ExceptionUtil.rethrowIfInterrupt(t); - if (tries > startLogErrorsCnt) { - LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + - (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " - + "cancelled=" + cancelled.get() + ", msg=" - + callable.getExceptionMessageAdditionalDetail()); - } - - // translateException throws exception when should not retry: i.e. when request is bad. - interceptor.handleFailure(context, t); - t = translateException(t); - callable.throwable(t, retries != 1); - RetriesExhaustedException.ThrowableWithExtraContext qt = - new RetriesExhaustedException.ThrowableWithExtraContext(t, - EnvironmentEdgeManager.currentTime(), toString()); - exceptions.add(qt); - if (tries >= retries - 1) { - throw new RetriesExhaustedException(tries, exceptions); - } - // If the server is dead, we need to wait a little before retrying, to give - // a chance to the regions to be - // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time - expectedSleep = callable.sleep(pause, tries + 1); - - // If, after the planned sleep, there won't be enough time left, we stop now. - long duration = singleCallDuration(expectedSleep); - if (duration > callTimeout) { - String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + - ": " + callable.getExceptionMessageAdditionalDetail(); - throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); - } - } finally { - interceptor.updateFailureInfo(context); - } - try { - if (expectedSleep > 0) { - synchronized (cancelled) { - if (cancelled.get()) return null; - cancelled.wait(expectedSleep); - } - } - if (cancelled.get()) return null; - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); - } - } - } - - /** - * @return Calculate how long a single call took - */ - private long singleCallDuration(final long expectedSleep) { - return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep; - } + T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException; /** * Call the server once only. @@ -191,62 +50,6 @@ public class RpcRetryingCaller { * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ - public T callWithoutRetries(RetryingCallable callable, int callTimeout) - throws IOException, RuntimeException { - // The code of this method should be shared with withRetries. - this.globalStartTime = EnvironmentEdgeManager.currentTime(); - try { - callable.prepare(false); - return callable.call(callTimeout); - } catch (Throwable t) { - Throwable t2 = translateException(t); - ExceptionUtil.rethrowIfInterrupt(t2); - // It would be nice to clear the location cache here. - if (t2 instanceof IOException) { - throw (IOException)t2; - } else { - throw new RuntimeException(t2); - } - } - } - - /** - * Get the good or the remote exception if any, throws the DoNotRetryIOException. - * @param t the throwable to analyze - * @return the translated exception, if it's not a DoNotRetryIOException - * @throws DoNotRetryIOException - if we find it, we throw it instead of translating. - */ - static Throwable translateException(Throwable t) throws DoNotRetryIOException { - if (t instanceof UndeclaredThrowableException) { - if (t.getCause() != null) { - t = t.getCause(); - } - } - if (t instanceof RemoteException) { - t = ((RemoteException)t).unwrapRemoteException(); - } - if (t instanceof LinkageError) { - throw new DoNotRetryIOException(t); - } - if (t instanceof ServiceException) { - ServiceException se = (ServiceException)t; - Throwable cause = se.getCause(); - if (cause != null && cause instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException)cause; - } - // Don't let ServiceException out; its rpc specific. - t = cause; - // t could be a RemoteException so go aaround again. - translateException(t); - } else if (t instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException)t; - } - return t; - } - - @Override - public String toString() { - return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + - ", pause=" + pause + ", retries=" + retries + '}'; - } + T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 9f059977c2b..6f2760fcfa1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -35,6 +35,8 @@ public class RpcRetryingCallerFactory { private final int retries; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + private final boolean enableBackPressure; + private ServerStatisticTracker stats; public RpcRetryingCallerFactory(Configuration conf) { this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); @@ -49,27 +51,53 @@ public class RpcRetryingCallerFactory { startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; + enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + } + + /** + * Set the tracker that should be used for tracking statistics about the server + */ + public void setStatisticTracker(ServerStatisticTracker statisticTracker) { + this.stats = statisticTracker; } public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - return new RpcRetryingCaller(pause, retries, interceptor, startLogErrorsCnt); + RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, + startLogErrorsCnt); + + // wrap it with stats, if we are tracking them + if (enableBackPressure && this.stats != null) { + caller = new StatsTrackingRpcRetryingCaller(caller, this.stats); + } + + return caller; } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } - + public static RpcRetryingCallerFactory instantiate(Configuration configuration, - RetryingCallerInterceptor interceptor) { + ServerStatisticTracker stats) { + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); if (rpcCallerFactoryClazz.equals(clazzName)) { return new RpcRetryingCallerFactory(configuration, interceptor); } - return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, - new Class[] { Configuration.class }, new Object[] { configuration }); + RpcRetryingCallerFactory factory = ReflectionUtils.instantiateWithCustomCtor( + rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); + + // setting for backwards compat with existing caller factories, rather than in the ctor + factory.setStatisticTracker(stats); + return factory; } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java new file mode 100644 index 00000000000..1d037bc7b9b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -0,0 +1,238 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.ipc.RemoteException; + +import com.google.protobuf.ServiceException; + +/** + * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client + * threadlocal outstanding timeouts as so we don't persist too much. + * Dynamic rather than static so can set the generic appropriately. + * + * This object has a state. It should not be used by in parallel by different threads. + * Reusing it is possible however, even between multiple threads. However, the user will + * have to manage the synchronization on its side: there is no synchronization inside the class. + */ +@InterfaceAudience.Private +public class RpcRetryingCallerImpl implements RpcRetryingCaller { + public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class); + /** + * When we started making calls. + */ + private long globalStartTime; + /** + * Start and end times for a single call. + */ + private final static int MIN_RPC_TIMEOUT = 2000; + /** How many retries are allowed before we start to log */ + private final int startLogErrorsCnt; + + private final long pause; + private final int retries; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final RetryingCallerInterceptor interceptor; + private final RetryingCallerInterceptorContext context; + + public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { + this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); + } + + public RpcRetryingCallerImpl(long pause, int retries, + RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { + this.pause = pause; + this.retries = retries; + this.interceptor = interceptor; + context = interceptor.createEmptyContext(); + this.startLogErrorsCnt = startLogErrorsCnt; + } + + private int getRemainingTime(int callTimeout) { + if (callTimeout <= 0) { + return 0; + } else { + if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; + int remainingTime = (int) (callTimeout - + (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); + if (remainingTime < MIN_RPC_TIMEOUT) { + // If there is no time left, we're trying anyway. It's too late. + // 0 means no timeout, and it's not the intent here. So we secure both cases by + // resetting to the minimum. + remainingTime = MIN_RPC_TIMEOUT; + } + return remainingTime; + } + } + + @Override + public void cancel(){ + cancelled.set(true); + synchronized (cancelled){ + cancelled.notifyAll(); + } + } + + @Override + public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + List exceptions = + new ArrayList(); + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + context.clear(); + for (int tries = 0;; tries++) { + long expectedSleep; + try { + callable.prepare(tries != 0); // if called with false, check table status on ZK + interceptor.intercept(context.prepare(callable, tries)); + return callable.call(getRemainingTime(callTimeout)); + } catch (PreemptiveFastFailException e) { + throw e; + } catch (Throwable t) { + ExceptionUtil.rethrowIfInterrupt(t); + if (tries > startLogErrorsCnt) { + LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + + (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " + + "cancelled=" + cancelled.get() + ", msg=" + + callable.getExceptionMessageAdditionalDetail()); + } + + // translateException throws exception when should not retry: i.e. when request is bad. + interceptor.handleFailure(context, t); + t = translateException(t); + callable.throwable(t, retries != 1); + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(t, + EnvironmentEdgeManager.currentTime(), toString()); + exceptions.add(qt); + if (tries >= retries - 1) { + throw new RetriesExhaustedException(tries, exceptions); + } + // If the server is dead, we need to wait a little before retrying, to give + // a chance to the regions to be + // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + expectedSleep = callable.sleep(pause, tries + 1); + + // If, after the planned sleep, there won't be enough time left, we stop now. + long duration = singleCallDuration(expectedSleep); + if (duration > callTimeout) { + String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + + ": " + callable.getExceptionMessageAdditionalDetail(); + throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); + } + } finally { + interceptor.updateFailureInfo(context); + } + try { + if (expectedSleep > 0) { + synchronized (cancelled) { + if (cancelled.get()) return null; + cancelled.wait(expectedSleep); + } + } + if (cancelled.get()) return null; + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); + } + } + } + + /** + * @return Calculate how long a single call took + */ + private long singleCallDuration(final long expectedSleep) { + return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep; + } + + @Override + public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + // The code of this method should be shared with withRetries. + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + try { + callable.prepare(false); + return callable.call(callTimeout); + } catch (Throwable t) { + Throwable t2 = translateException(t); + ExceptionUtil.rethrowIfInterrupt(t2); + // It would be nice to clear the location cache here. + if (t2 instanceof IOException) { + throw (IOException)t2; + } else { + throw new RuntimeException(t2); + } + } + } + + /** + * Get the good or the remote exception if any, throws the DoNotRetryIOException. + * @param t the throwable to analyze + * @return the translated exception, if it's not a DoNotRetryIOException + * @throws DoNotRetryIOException - if we find it, we throw it instead of translating. + */ + static Throwable translateException(Throwable t) throws DoNotRetryIOException { + if (t instanceof UndeclaredThrowableException) { + if (t.getCause() != null) { + t = t.getCause(); + } + } + if (t instanceof RemoteException) { + t = ((RemoteException)t).unwrapRemoteException(); + } + if (t instanceof LinkageError) { + throw new DoNotRetryIOException(t); + } + if (t instanceof ServiceException) { + ServiceException se = (ServiceException)t; + Throwable cause = se.getCause(); + if (cause != null && cause instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)cause; + } + // Don't let ServiceException out; its rpc specific. + t = cause; + // t could be a RemoteException so go aaround again. + translateException(t); + } else if (t instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)t; + } + return t; + } + + @Override + public String toString() { + return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + + ", pause=" + pause + ", retries=" + retries + '}'; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java new file mode 100644 index 00000000000..0c7b683d79a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks the statistics for multiple regions + */ +@InterfaceAudience.Private +public class ServerStatisticTracker { + + private final Map stats = + new ConcurrentHashMap(); + + public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats + currentStats) { + ServerStatistics stat = stats.get(server); + + if (stat == null) { + // create a stats object and update the stats + synchronized (this) { + stat = stats.get(server); + // we don't have stats for that server yet, so we need to make some + if (stat == null) { + stat = new ServerStatistics(); + stats.put(server, stat); + } + } + } + stat.update(region, currentStats); + } + + public ServerStatistics getStats(ServerName server) { + return this.stats.get(server); + } + + public static ServerStatisticTracker create(Configuration conf) { + if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) { + return null; + } + return new ServerStatisticTracker(); + } + + @VisibleForTesting + ServerStatistics getServerStatsForTesting(ServerName server) { + return stats.get(server); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java new file mode 100644 index 00000000000..cec0ee59dec --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return, + * if stats are available + */ +@InterfaceAudience.Private +public class StatsTrackingRpcRetryingCaller implements RpcRetryingCaller { + private final ServerStatisticTracker stats; + private final RpcRetryingCaller delegate; + + public StatsTrackingRpcRetryingCaller(RpcRetryingCaller delegate, + ServerStatisticTracker stats) { + this.delegate = delegate; + this.stats = stats; + } + + @Override + public void cancel() { + delegate.cancel(); + } + + @Override + public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + T result = delegate.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + @Override + public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + T result = delegate.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + private T updateStatsAndUnwrap(T result, RetryingCallable callable) { + // don't track stats about requests that aren't to regionservers + if (!(callable instanceof RegionServerCallable)) { + return result; + } + + // mutli-server callables span multiple regions, so they don't have a location, + // but they are region server callables, so we have to handle them when we process the + // result, not in here + if (callable instanceof MultiServerCallable) { + return result; + } + + // update the stats for the single server callable + RegionServerCallable regionCallable = (RegionServerCallable) callable; + HRegionLocation location = regionCallable.getLocation(); + return ResultStatsUtil.updateStats(result, stats, location); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java new file mode 100644 index 00000000000..94e434f1c97 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Configurable policy for the amount of time a client should wait for a new request to the + * server when given the server load statistics. + *

+ * Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration} + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ClientBackoffPolicy { + + public static final String BACKOFF_POLICY_CLASS = + "hbase.client.statistics.backoff-policy"; + + /** + * @return the number of ms to wait on the client based on the + */ + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java new file mode 100644 index 00000000000..879a0e22c0d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ClientBackoffPolicyFactory { + + private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class); + + private ClientBackoffPolicyFactory() { + } + + public static ClientBackoffPolicy create(Configuration conf) { + // create the backoff policy + String className = + conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class + .getName()); + return ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class }, new Object[] { conf }); + } + + /** + * Default backoff policy that doesn't create any backoff for the client, regardless of load + */ + public static class NoBackoffPolicy implements ClientBackoffPolicy { + public NoBackoffPolicy(Configuration conf){ + // necessary to meet contract + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + return 0; + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java new file mode 100644 index 00000000000..6e75670227e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple exponential backoff policy on for the client that uses a percent^4 times the + * max backoff to generate the backoff time. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { + + private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class); + + private static final long ONE_MINUTE = 60 * 1000; + public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE; + public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max"; + private long maxBackoff; + + public ExponentialClientBackoffPolicy(Configuration conf) { + this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF); + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + // no stats for the server yet, so don't backoff + if (stats == null) { + return 0; + } + + ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region); + // no stats for the region yet - don't backoff + if (regionStats == null) { + return 0; + } + + // square the percent as a value less than 1. Closer we move to 100 percent, + // the percent moves to 1, but squaring causes the exponential curve + double percent = regionStats.getMemstoreLoadPercent() / 100.0; + double multiplier = Math.pow(percent, 4.0); + // shouldn't ever happen, but just incase something changes in the statistic data + if (multiplier > 1) { + LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " + + "down to the max backoff"); + multiplier = 1; + } + return (long) (multiplier * maxBackoff); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java new file mode 100644 index 00000000000..a3b8e11a632 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Track the statistics for a single region + */ +@InterfaceAudience.Private +public class ServerStatistics { + + private Map + stats = new TreeMap(Bytes.BYTES_COMPARATOR); + + /** + * Good enough attempt. Last writer wins. It doesn't really matter which one gets to update, + * as something gets set + * @param region + * @param currentStats + */ + public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) { + RegionStatistics regionStat = this.stats.get(region); + if(regionStat == null){ + regionStat = new RegionStatistics(); + this.stats.put(region, regionStat); + } + + regionStat.update(currentStats); + } + + @InterfaceAudience.Private + public RegionStatistics getStatsForRegion(byte[] regionName){ + return stats.get(regionName); + } + + public static class RegionStatistics{ + private int memstoreLoad = 0; + + public void update(ClientProtos.RegionLoadStats currentStats) { + this.memstoreLoad = currentStats.getMemstoreLoad(); + } + + public int getMemstoreLoadPercent(){ + return this.memstoreLoad; + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 8c75f4fa3f0..8433ceebbab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -61,7 +61,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ this.connection = conn; this.table = table; this.row = row; - this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration()); + this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null); this.operationTimeout = conn.getConfiguration().getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 70da40c0389..1d42a828275 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -114,17 +114,23 @@ public final class ResponseConverter { } for (ResultOrException roe : actionResult.getResultOrExceptionList()) { + Object responseValue; if (roe.hasException()) { - results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException())); + responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { - results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells)); + responseValue = ProtobufUtil.toResult(roe.getResult(), cells); + // add the load stats, if we got any + if (roe.hasLoadStats()) { + ((Result) responseValue).addResults(roe.getLoadStats()); + } } else if (roe.hasServiceResult()) { - results.add(regionName, roe.getIndex(), roe.getServiceResult()); + responseValue = roe.getServiceResult(); } else { // no result & no exception. Unexpected. throw new IllegalStateException("No result & no exception roe=" + roe + " for region " + actions.getRegion()); } + results.add(regionName, roe.getIndex(), responseValue); } } @@ -149,9 +155,11 @@ public final class ResponseConverter { * @param r * @return an action result builder */ - public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) { + public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r, + ClientProtos.RegionLoadStats stats) { ResultOrException.Builder builder = ResultOrException.newBuilder(); if (r != null) builder.setResult(r); + if(stats != null) builder.setLoadStats(stats); return builder; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d2196382f94..88a95fbfd50 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -190,7 +190,7 @@ public class TestAsyncProcess { } }); - return new RpcRetryingCaller(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -208,7 +208,7 @@ public class TestAsyncProcess { } } - static class CallerWithFailure extends RpcRetryingCaller{ + static class CallerWithFailure extends RpcRetryingCallerImpl{ public CallerWithFailure() { super(100, 100, 9); @@ -294,7 +294,7 @@ public class TestAsyncProcess { replicaCalls.incrementAndGet(); } - return new RpcRetryingCaller(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java new file mode 100644 index 00000000000..88e409d5baf --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestClientExponentialBackoff { + + ServerName server = Mockito.mock(ServerName.class); + byte[] regionname = Bytes.toBytes("region"); + + @Test + public void testNulls() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + assertEquals(0, backoff.getBackoffTime(null, null, null)); + + // server name doesn't matter to calculation, but check it now anyways + assertEquals(0, backoff.getBackoffTime(server, null, null)); + assertEquals(0, backoff.getBackoffTime(server, regionname, null)); + + // check when no stats for the region yet + ServerStatistics stats = new ServerStatistics(); + assertEquals(0, backoff.getBackoffTime(server, regionname, stats)); + } + + @Test + public void testMaxLoad() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + update(stats, 100); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + + // another policy with a different max timeout + long max = 100; + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max); + ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // test beyond 100 still doesn't exceed the max + update(stats, 101); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // and that when we are below 100, its less than the max timeout + update(stats, 99); + assertTrue(backoff.getBackoffTime(server, + regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); + assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max); + } + + /** + * Make sure that we get results in the order that we expect - backoff for a load of 1 should + * less than backoff for 10, which should be less than that for 50. + */ + @Test + public void testResultOrdering() { + Configuration conf = new Configuration(false); + // make the max timeout really high so we get differentiation between load factors + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + long previous = backoff.getBackoffTime(server, regionname, stats); + for (int i = 1; i <= 100; i++) { + update(stats, i); + long next = backoff.getBackoffTime(server, regionname, stats); + assertTrue( + "Previous backoff time" + previous + " >= " + next + ", the next backoff time for " + + "load " + i, previous < next); + previous = next; + } + } + + private void update(ServerStatistics stats, int load) { + ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() + .setMemstoreLoad + (load).build(); + stats.update(regionname, stat); + } +} \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java index 080cd8b5929..e82e59d3b45 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java @@ -564,7 +564,7 @@ public class TestFastFailWithoutTestUtil { public RpcRetryingCaller getRpcRetryingCaller(int pauseTime, int retries, RetryingCallerInterceptor interceptor) { - return new RpcRetryingCaller(pauseTime, retries, interceptor, 9) { + return new RpcRetryingCallerImpl(pauseTime, retries, interceptor, 9) { @Override public Void callWithRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 60017670861..33b71ad01c8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1099,6 +1099,12 @@ public final class HConstants { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + /** Config key for if the server should send backpressure and if the client should listen to + * that backpressure from the server */ + public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; + public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false; + + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index c36662ecdad..ab86e1e269c 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -26210,6 +26210,482 @@ public final class ClientProtos { // @@protoc_insertion_point(class_scope:RegionAction) } + public interface RegionLoadStatsOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional int32 memstoreLoad = 1 [default = 0]; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + boolean hasMemstoreLoad(); + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + int getMemstoreLoad(); + } + /** + * Protobuf type {@code RegionLoadStats} + * + *
+   *
+   * Statistics about the current load on the region
+   * 
+ */ + public static final class RegionLoadStats extends + com.google.protobuf.GeneratedMessage + implements RegionLoadStatsOrBuilder { + // Use RegionLoadStats.newBuilder() to construct. + private RegionLoadStats(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private RegionLoadStats(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final RegionLoadStats defaultInstance; + public static RegionLoadStats getDefaultInstance() { + return defaultInstance; + } + + public RegionLoadStats getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private RegionLoadStats( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + memstoreLoad_ = input.readInt32(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public RegionLoadStats parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new RegionLoadStats(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional int32 memstoreLoad = 1 [default = 0]; + public static final int MEMSTORELOAD_FIELD_NUMBER = 1; + private int memstoreLoad_; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public boolean hasMemstoreLoad() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public int getMemstoreLoad() { + return memstoreLoad_; + } + + private void initFields() { + memstoreLoad_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt32(1, memstoreLoad_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, memstoreLoad_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) obj; + + boolean result = true; + result = result && (hasMemstoreLoad() == other.hasMemstoreLoad()); + if (hasMemstoreLoad()) { + result = result && (getMemstoreLoad() + == other.getMemstoreLoad()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasMemstoreLoad()) { + hash = (37 * hash) + MEMSTORELOAD_FIELD_NUMBER; + hash = (53 * hash) + getMemstoreLoad(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code RegionLoadStats} + * + *
+     *
+     * Statistics about the current load on the region
+     * 
+ */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + memstoreLoad_ = 0; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats build() { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.memstoreLoad_ = memstoreLoad_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) return this; + if (other.hasMemstoreLoad()) { + setMemstoreLoad(other.getMemstoreLoad()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional int32 memstoreLoad = 1 [default = 0]; + private int memstoreLoad_ ; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public boolean hasMemstoreLoad() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public int getMemstoreLoad() { + return memstoreLoad_; + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder setMemstoreLoad(int value) { + bitField0_ |= 0x00000001; + memstoreLoad_ = value; + onChanged(); + return this; + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder clearMemstoreLoad() { + bitField0_ = (bitField0_ & ~0x00000001); + memstoreLoad_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RegionLoadStats) + } + + static { + defaultInstance = new RegionLoadStats(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RegionLoadStats) + } + public interface ResultOrExceptionOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -26286,6 +26762,32 @@ public final class ClientProtos { * */ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResultOrBuilder getServiceResultOrBuilder(); + + // optional .RegionLoadStats loadStats = 5; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + boolean hasLoadStats(); + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats(); + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder(); } /** * Protobuf type {@code ResultOrException} @@ -26389,6 +26891,19 @@ public final class ClientProtos { bitField0_ |= 0x00000008; break; } + case 42: { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder subBuilder = null; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + subBuilder = loadStats_.toBuilder(); + } + loadStats_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(loadStats_); + loadStats_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000010; + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -26533,11 +27048,46 @@ public final class ClientProtos { return serviceResult_; } + // optional .RegionLoadStats loadStats = 5; + public static final int LOADSTATS_FIELD_NUMBER = 5; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public boolean hasLoadStats() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() { + return loadStats_; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() { + return loadStats_; + } + private void initFields() { index_ = 0; result_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.getDefaultInstance(); exception_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair.getDefaultInstance(); serviceResult_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResult.getDefaultInstance(); + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -26575,6 +27125,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeMessage(4, serviceResult_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeMessage(5, loadStats_); + } getUnknownFields().writeTo(output); } @@ -26600,6 +27153,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(4, serviceResult_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(5, loadStats_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -26643,6 +27200,11 @@ public final class ClientProtos { result = result && getServiceResult() .equals(other.getServiceResult()); } + result = result && (hasLoadStats() == other.hasLoadStats()); + if (hasLoadStats()) { + result = result && getLoadStats() + .equals(other.getLoadStats()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -26672,6 +27234,10 @@ public final class ClientProtos { hash = (37 * hash) + SERVICE_RESULT_FIELD_NUMBER; hash = (53 * hash) + getServiceResult().hashCode(); } + if (hasLoadStats()) { + hash = (37 * hash) + LOADSTATS_FIELD_NUMBER; + hash = (53 * hash) + getLoadStats().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -26783,6 +27349,7 @@ public final class ClientProtos { getResultFieldBuilder(); getExceptionFieldBuilder(); getServiceResultFieldBuilder(); + getLoadStatsFieldBuilder(); } } private static Builder create() { @@ -26811,6 +27378,12 @@ public final class ClientProtos { serviceResultBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000008); + if (loadStatsBuilder_ == null) { + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + } else { + loadStatsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -26867,6 +27440,14 @@ public final class ClientProtos { } else { result.serviceResult_ = serviceResultBuilder_.build(); } + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + if (loadStatsBuilder_ == null) { + result.loadStats_ = loadStats_; + } else { + result.loadStats_ = loadStatsBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -26895,6 +27476,9 @@ public final class ClientProtos { if (other.hasServiceResult()) { mergeServiceResult(other.getServiceResult()); } + if (other.hasLoadStats()) { + mergeLoadStats(other.getLoadStats()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -27374,6 +27958,159 @@ public final class ClientProtos { return serviceResultBuilder_; } + // optional .RegionLoadStats loadStats = 5; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> loadStatsBuilder_; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public boolean hasLoadStats() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() { + if (loadStatsBuilder_ == null) { + return loadStats_; + } else { + return loadStatsBuilder_.getMessage(); + } + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder setLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) { + if (loadStatsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + loadStats_ = value; + onChanged(); + } else { + loadStatsBuilder_.setMessage(value); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder setLoadStats( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder builderForValue) { + if (loadStatsBuilder_ == null) { + loadStats_ = builderForValue.build(); + onChanged(); + } else { + loadStatsBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder mergeLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) { + if (loadStatsBuilder_ == null) { + if (((bitField0_ & 0x00000010) == 0x00000010) && + loadStats_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) { + loadStats_ = + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder(loadStats_).mergeFrom(value).buildPartial(); + } else { + loadStats_ = value; + } + onChanged(); + } else { + loadStatsBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder clearLoadStats() { + if (loadStatsBuilder_ == null) { + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + onChanged(); + } else { + loadStatsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder getLoadStatsBuilder() { + bitField0_ |= 0x00000010; + onChanged(); + return getLoadStatsFieldBuilder().getBuilder(); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() { + if (loadStatsBuilder_ != null) { + return loadStatsBuilder_.getMessageOrBuilder(); + } else { + return loadStats_; + } + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> + getLoadStatsFieldBuilder() { + if (loadStatsBuilder_ == null) { + loadStatsBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder>( + loadStats_, + getParentForChildren(), + isClean()); + loadStats_ = null; + } + return loadStatsBuilder_; + } + // @@protoc_insertion_point(builder_scope:ResultOrException) } @@ -31066,6 +31803,11 @@ public final class ClientProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_RegionAction_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_RegionLoadStats_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RegionLoadStats_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_ResultOrException_descriptor; private static @@ -31180,31 +31922,33 @@ public final class ClientProtos { "\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Cop" + "rocessorServiceCall\"Y\n\014RegionAction\022 \n\006r" + "egion\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030" + - "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"\221\001\n\021Resul" + - "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + - "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + - "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + - "essorServiceResult\"f\n\022RegionActionResult", - "\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrEx" + - "ception\022!\n\texception\030\002 \001(\0132\016.NameBytesPa" + - "ir\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" + - "2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tc" + - "ondition\030\003 \001(\0132\n.Condition\"S\n\rMultiRespo" + - "nse\022/\n\022regionActionResult\030\001 \003(\0132\023.Region" + - "ActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consi" + - "stency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCli" + - "entService\022 \n\003Get\022\013.GetRequest\032\014.GetResp" + - "onse\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateR", - "esponse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResp" + - "onse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReq" + - "uest\032\026.BulkLoadHFileResponse\022F\n\013ExecServ" + - "ice\022\032.CoprocessorServiceRequest\032\033.Coproc" + - "essorServiceResponse\022R\n\027ExecRegionServer" + - "Service\022\032.CoprocessorServiceRequest\032\033.Co" + - "processorServiceResponse\022&\n\005Multi\022\r.Mult" + - "iRequest\032\016.MultiResponseBB\n*org.apache.h" + - "adoop.hbase.protobuf.generatedB\014ClientPr" + - "otosH\001\210\001\001\240\001\001" + "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"*\n\017Region" + + "LoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\"\266\001\n\021R" + + "esultOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006resul" + + "t\030\002 \001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.Na" + + "meBytesPair\0221\n\016service_result\030\004 \001(\0132\031.Co", + "processorServiceResult\022#\n\tloadStats\030\005 \001(" + + "\0132\020.RegionLoadStats\"f\n\022RegionActionResul" + + "t\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrE" + + "xception\022!\n\texception\030\002 \001(\0132\016.NameBytesP" + + "air\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(" + + "\0132\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\t" + + "condition\030\003 \001(\0132\n.Condition\"S\n\rMultiResp" + + "onse\022/\n\022regionActionResult\030\001 \003(\0132\023.Regio" + + "nActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Cons" + + "istency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCl", + "ientService\022 \n\003Get\022\013.GetRequest\032\014.GetRes" + + "ponse\022)\n\006Mutate\022\016.MutateRequest\032\017.Mutate" + + "Response\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRes" + + "ponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRe" + + "quest\032\026.BulkLoadHFileResponse\022F\n\013ExecSer" + + "vice\022\032.CoprocessorServiceRequest\032\033.Copro" + + "cessorServiceResponse\022R\n\027ExecRegionServe" + + "rService\022\032.CoprocessorServiceRequest\032\033.C" + + "oprocessorServiceResponse\022&\n\005Multi\022\r.Mul" + + "tiRequest\032\016.MultiResponseBB\n*org.apache.", + "hadoop.hbase.protobuf.generatedB\014ClientP" + + "rotosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -31361,26 +32105,32 @@ public final class ClientProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionAction_descriptor, new java.lang.String[] { "Region", "Atomic", "Action", }); - internal_static_ResultOrException_descriptor = + internal_static_RegionLoadStats_descriptor = getDescriptor().getMessageTypes().get(22); + internal_static_RegionLoadStats_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RegionLoadStats_descriptor, + new java.lang.String[] { "MemstoreLoad", }); + internal_static_ResultOrException_descriptor = + getDescriptor().getMessageTypes().get(23); internal_static_ResultOrException_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ResultOrException_descriptor, - new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", }); + new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", "LoadStats", }); internal_static_RegionActionResult_descriptor = - getDescriptor().getMessageTypes().get(23); + getDescriptor().getMessageTypes().get(24); internal_static_RegionActionResult_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionActionResult_descriptor, new java.lang.String[] { "ResultOrException", "Exception", }); internal_static_MultiRequest_descriptor = - getDescriptor().getMessageTypes().get(24); + getDescriptor().getMessageTypes().get(25); internal_static_MultiRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiRequest_descriptor, new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", }); internal_static_MultiResponse_descriptor = - getDescriptor().getMessageTypes().get(25); + getDescriptor().getMessageTypes().get(26); internal_static_MultiResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiResponse_descriptor, diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index ede1c26e6a3..1a3c43e4a0f 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -353,6 +353,14 @@ message RegionAction { repeated Action action = 3; } +/* +* Statistics about the current load on the region +*/ +message RegionLoadStats{ + // percent load on the memstore. Guaranteed to be positive, between 0 and 100 + optional int32 memstoreLoad = 1 [default = 0]; +} + /** * Either a Result or an Exception NameBytesPair (keyed by * exception name whose value is the exception stringified) @@ -366,6 +374,8 @@ message ResultOrException { optional NameBytesPair exception = 3; // result if this was a coprocessor service call optional CoprocessorServiceResult service_result = 4; + // current load on the region + optional RegionLoadStats loadStats = 5; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b4b6adce1cc..31be7ad1112 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -690,7 +690,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { List toRetry = new ArrayList(); Configuration conf = getConf(); - boolean success = RpcRetryingCallerFactory.instantiate(conf). newCaller() + boolean success = RpcRetryingCallerFactory.instantiate(conf, + null). newCaller() .callWithRetries(svrCallable, Integer.MAX_VALUE); if (!success) { LOG.warn("Attempt to bulk load region containing " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 95ba337d97b..428e8575a9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; @@ -122,6 +123,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -544,6 +546,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; private final Durability durability; + private final boolean regionStatsEnabled; /** * HRegion constructor. This constructor should only be used for testing and @@ -685,6 +688,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG); configurationManager = Optional.absent(); + + // disable stats tracking system tables, but check the config for everything else + this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ? + false : + conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); } void setHTableSpecificConf() { @@ -5187,18 +5197,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return results; } - public void mutateRow(RowMutations rm) throws IOException { + public ClientProtos.RegionLoadStats mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes - mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + return mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); } /** * Perform atomic mutations within the region w/o nonces. * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)} */ - public void mutateRowsWithLocks(Collection mutations, + public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection mutations, Collection rowsToLock) throws IOException { - mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); + return mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); } /** @@ -5213,10 +5223,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ - public void mutateRowsWithLocks(Collection mutations, + public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); processRowsWithLocks(proc, -1, nonceGroup, nonce); + return getRegionStats(); + } + + /** + * @return the current load statistics for the the region + */ + public ClientProtos.RegionLoadStats getRegionStats() { + if (!regionStatsEnabled) { + return null; + } + ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); + stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this + .memstoreFlushSize))); + return stats.build(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 97c698733ba..32d59d41870 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -257,8 +257,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private static ResultOrException getResultOrException( - final ClientProtos.Result r, final int index) { - return getResultOrException(ResponseConverter.buildActionResult(r), index); + final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) { + return getResultOrException(ResponseConverter.buildActionResult(r, stats), index); } private static ResultOrException getResultOrException(final Exception e, final int index) { @@ -355,7 +355,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. * @throws IOException */ - private void mutateRows(final HRegion region, final List actions, + private ClientProtos.RegionLoadStats mutateRows(final HRegion region, + final List actions, final CellScanner cellScanner) throws IOException { if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); @@ -381,7 +382,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } - region.mutateRow(rm); + return region.mutateRow(rm); } /** @@ -661,7 +662,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, case SUCCESS: builder.addResultOrException(getResultOrException( - ClientProtos.Result.getDefaultInstance(), index)); + ClientProtos.Result.getDefaultInstance(), index, region.getRegionStats())); break; } } @@ -1815,6 +1816,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, processed = checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, qualifier, compareOp, comparator); } else { + ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(), + cellScanner); + // add the stats to the request + if(stats != null) { + responseBuilder.addRegionActionResult(RegionActionResult.newBuilder() + .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats))); + } mutateRows(region, regionAction.getActionList(), cellScanner); processed = Boolean.TRUE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index ff5f2f53ecd..59a1b4395cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -162,7 +162,7 @@ public class WALEditsReplaySink { private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List entries) throws IOException { try { - RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf); + RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null); ReplayServerCallable callable = new ReplayServerCallable(this.conn, this.tableName, regionLoc, regionInfo, entries); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index a99b0476b12..998cdf0e49f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -134,7 +134,7 @@ public class HConnectionTestingUtility { Mockito.doNothing().when(c).decCount(); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, - RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR)); + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); HTableInterface t = Mockito.mock(HTableInterface.class); Mockito.when(c.getTable((TableName)Mockito.any())).thenReturn(t); ResultScanner rs = Mockito.mock(ResultScanner.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java new file mode 100644 index 00000000000..dfb9a7078ea --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test that we can actually send and use region metrics to slowdown client writes + */ +@Category(MediumTests.class) +public class TestClientPushback { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final byte[] tableName = Bytes.toBytes("client-pushback"); + private static final byte[] family = Bytes.toBytes("f"); + private static final byte[] qualifier = Bytes.toBytes("q"); + private static long flushSizeBytes = 1024; + + @BeforeClass + public static void setupCluster() throws Exception{ + Configuration conf = UTIL.getConfiguration(); + // enable backpressure + conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); + // turn the memstore size way down so we don't need to write a lot to see changes in memstore + // load + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + flushSizeBytes); + // ensure we block the flushes when we are double that flushsize + conf.setLong("hbase.hregion.memstore.block.multiplier", 2); + + UTIL.startMiniCluster(); + UTIL.createTable(tableName, family); + } + + @AfterClass + public static void teardownCluster() throws Exception{ + UTIL.shutdownMiniCluster(); + } + + @Test + public void testClientTrackesServerPushback() throws Exception{ + Configuration conf = UTIL.getConfiguration(); + TableName tablename = TableName.valueOf(tableName); + HTable table = new HTable(conf, tablename); + //make sure we flush after each put + table.setAutoFlushTo(true); + + // write some data + Put p = new Put(Bytes.toBytes("row")); + p.add(family, qualifier, Bytes.toBytes("value1")); + table.put(p); + + // get the stats for the region hosting our table + ClusterConnection conn = ConnectionManager.getConnectionInternal(conf); + ServerStatisticTracker stats = conn.getStatisticsTracker(); + assertNotNull( "No stats configured for the client!", stats); + // get the names so we can query the stats + ServerName server = UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + byte[] regionName = UTIL.getHBaseCluster().getRegionServer(0).getOnlineRegions(tablename).get + (0).getRegionName(); + + // check to see we found some load on the memstore + ServerStatistics serverStats = stats.getServerStatsForTesting(server); + ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); + assertEquals(15, regionStats.getMemstoreLoadPercent()); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 837e37bb2b3..3d1b1c80832 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -76,7 +76,7 @@ public class TestReplicasClient { private static final Log LOG = LogFactory.getLog(TestReplicasClient.class); static { - ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL); } private static final int NB_SERVERS = 1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index f5380efb43f..7ca12f03761 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.RpcRetryingCallerImpl; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -74,7 +75,7 @@ public class TestRegionReplicaReplicationEndpoint { private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class); static { - ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL); } private static final int NB_SERVERS = 2;