HBASE-7649 client retry timeout doesn't need to do x2 fallback when going to different server
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1464798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6debbada44
commit
d747617e15
|
@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.zookeeper.*;
|
|||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
|
@ -167,6 +168,8 @@ public class HConnectionManager {
|
|||
/** Default admin protocol class name. */
|
||||
public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
|
||||
|
||||
public static final String RETRIES_BY_SERVER = "hbase.client.retries.by.server";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
|
||||
|
||||
static {
|
||||
|
@ -513,10 +516,12 @@ public class HConnectionManager {
|
|||
private final Class<? extends AdminProtocol> adminClass;
|
||||
private final Class<? extends ClientProtocol> clientClass;
|
||||
private final long pause;
|
||||
private final int numRetries;
|
||||
private final int numTries;
|
||||
private final int maxRPCAttempts;
|
||||
private final int rpcTimeout;
|
||||
private final int prefetchRegionLimit;
|
||||
private final boolean useServerTrackerForRetries;
|
||||
private final long serverTrackerTimeout;
|
||||
|
||||
private volatile boolean closed;
|
||||
private volatile boolean aborted;
|
||||
|
@ -602,7 +607,7 @@ public class HConnectionManager {
|
|||
}
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
this.maxRPCAttempts = conf.getInt(
|
||||
HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
|
||||
|
@ -613,7 +618,21 @@ public class HConnectionManager {
|
|||
this.prefetchRegionLimit = conf.getInt(
|
||||
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
|
||||
|
||||
this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER, true);
|
||||
long serverTrackerTimeout = 0;
|
||||
if (this.useServerTrackerForRetries) {
|
||||
// Server tracker allows us to do faster, and yet useful (hopefully), retries.
|
||||
// However, if we are too useful, we might fail very quickly due to retry count limit.
|
||||
// To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
|
||||
// retry time if normal retries were used. Then we will retry until this time runs out.
|
||||
// If we keep hitting one server, the net effect will be the incremental backoff, and
|
||||
// essentially the same number of retries as planned. If we have to do faster retries,
|
||||
// we will do more retries in aggregate, but the user will be none the wiser.
|
||||
for (int i = 0; i < this.numTries; ++i) {
|
||||
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
|
||||
}
|
||||
}
|
||||
this.serverTrackerTimeout = serverTrackerTimeout;
|
||||
retrieveClusterId();
|
||||
|
||||
// ProtobufRpcClientEngine is the main RpcClientEngine implementation,
|
||||
|
@ -772,10 +791,10 @@ public class HConnectionManager {
|
|||
|
||||
if (exceptionCaught != null)
|
||||
// It failed. If it's not the last try, we're going to wait a little
|
||||
if (tries < numRetries) {
|
||||
if (tries < numTries) {
|
||||
// tries at this point is 1 or more; decrement to start from 0.
|
||||
long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
|
||||
LOG.info("getMaster attempt " + tries + " of " + numRetries +
|
||||
LOG.info("getMaster attempt " + tries + " of " + numTries +
|
||||
" failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught);
|
||||
|
||||
try {
|
||||
|
@ -788,7 +807,7 @@ public class HConnectionManager {
|
|||
|
||||
} else {
|
||||
// Enough tries, we stop now
|
||||
LOG.info("getMaster attempt " + tries + " of " + numRetries +
|
||||
LOG.info("getMaster attempt " + tries + " of " + numTries +
|
||||
" failed; no more retrying.", exceptionCaught);
|
||||
throw new MasterNotRunningException(exceptionCaught);
|
||||
}
|
||||
|
@ -1103,7 +1122,7 @@ public class HConnectionManager {
|
|||
return location;
|
||||
}
|
||||
}
|
||||
int localNumRetries = retry ? numRetries : 1;
|
||||
int localNumRetries = retry ? numTries : 1;
|
||||
// build the key of the meta region we should be looking for.
|
||||
// the extra 9's on the end are necessary to allow "exact" matches
|
||||
// without knowing the precise region names.
|
||||
|
@ -1112,7 +1131,7 @@ public class HConnectionManager {
|
|||
for (int tries = 0; true; tries++) {
|
||||
if (tries >= localNumRetries) {
|
||||
throw new NoServerForRegionException("Unable to find region for "
|
||||
+ Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
|
||||
+ Bytes.toStringBinary(row) + " after " + numTries + " tries.");
|
||||
}
|
||||
|
||||
HRegionLocation metaLocation = null;
|
||||
|
@ -1210,13 +1229,13 @@ public class HConnectionManager {
|
|||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries < numRetries - 1) {
|
||||
if (tries < numTries - 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("locateRegionInMeta parentTable=" +
|
||||
Bytes.toString(parentTable) + ", metaLocation=" +
|
||||
((metaLocation == null)? "null": "{" + metaLocation + "}") +
|
||||
", attempt=" + tries + " of " +
|
||||
this.numRetries + " failed; retrying after sleep of " +
|
||||
this.numTries + " failed; retrying after sleep of " +
|
||||
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
|
||||
}
|
||||
} else {
|
||||
|
@ -1969,6 +1988,8 @@ public class HConnectionManager {
|
|||
private final List<Action<R>> toReplay;
|
||||
private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
|
||||
inProgress;
|
||||
|
||||
private ServerErrorTracker errorsByServer = null;
|
||||
private int curNumRetries;
|
||||
|
||||
// Notified when a tasks is done
|
||||
|
@ -1994,10 +2015,11 @@ public class HConnectionManager {
|
|||
* Group a list of actions per region servers, and send them. The created MultiActions are
|
||||
* added to the inProgress list.
|
||||
* @param actionsList
|
||||
* @param sleepTime - sleep time before actually executing the actions. Can be zero.
|
||||
* @param isRetry Whether we are retrying these actions. If yes, backoff
|
||||
* time may be applied before new requests.
|
||||
* @throws IOException - if we can't locate a region after multiple retries.
|
||||
*/
|
||||
private void submit(List<Action<R>> actionsList, final long sleepTime) throws IOException {
|
||||
private void submit(List<Action<R>> actionsList, final boolean isRetry) throws IOException {
|
||||
// group per location => regions server
|
||||
final Map<HRegionLocation, MultiAction<R>> actionsByServer =
|
||||
new HashMap<HRegionLocation, MultiAction<R>>();
|
||||
|
@ -2022,15 +2044,25 @@ public class HConnectionManager {
|
|||
|
||||
// Send the queries and add them to the inProgress list
|
||||
for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
|
||||
long backoffTime = 0;
|
||||
if (isRetry) {
|
||||
if (hci.isUsingServerTrackerForRetries()) {
|
||||
assert this.errorsByServer != null;
|
||||
backoffTime = this.errorsByServer.calculateBackoffTime(e.getKey(), hci.pause);
|
||||
} else {
|
||||
// curNumRetries starts with one, subtract to start from 0.
|
||||
backoffTime = ConnectionUtils.getPauseTime(hci.pause, curNumRetries - 1);
|
||||
}
|
||||
}
|
||||
Callable<MultiResponse> callable =
|
||||
createDelayedCallable(sleepTime, e.getKey(), e.getValue());
|
||||
if (LOG.isTraceEnabled() && (sleepTime > 0)) {
|
||||
createDelayedCallable(backoffTime, e.getKey(), e.getValue());
|
||||
if (LOG.isTraceEnabled() && isRetry) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Action<R> action : e.getValue().allActions()) {
|
||||
sb.append(Bytes.toStringBinary(action.getAction().getRow())).append(';');
|
||||
}
|
||||
LOG.trace("Sending requests to [" + e.getKey().getHostnamePort()
|
||||
+ "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]");
|
||||
LOG.trace("Will retry requests to [" + e.getKey().getHostnamePort()
|
||||
+ "] after delay of [" + backoffTime + "] for rows [" + sb.toString() + "]");
|
||||
}
|
||||
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
|
||||
new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
|
||||
|
@ -2044,9 +2076,7 @@ public class HConnectionManager {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void doRetry() throws IOException{
|
||||
// curNumRetries at this point is 1 or more; decrement to start from 0.
|
||||
final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries - 1);
|
||||
submit(this.toReplay, sleepTime);
|
||||
submit(this.toReplay, true);
|
||||
this.toReplay.clear();
|
||||
}
|
||||
|
||||
|
@ -2085,7 +2115,7 @@ public class HConnectionManager {
|
|||
}
|
||||
|
||||
// execute the actions. We will analyze and resubmit the actions in a 'while' loop.
|
||||
submit(listActions, 0);
|
||||
submit(listActions, false);
|
||||
|
||||
// LastRetry is true if, either:
|
||||
// we had an exception 'DoNotRetry'
|
||||
|
@ -2094,7 +2124,7 @@ public class HConnectionManager {
|
|||
boolean lastRetry = false;
|
||||
// despite its name numRetries means number of tries. So if numRetries == 1 it means we
|
||||
// won't retry. And we compare vs. 2 in case someone set it to zero.
|
||||
boolean noRetry = (hci.numRetries < 2);
|
||||
boolean noRetry = (hci.numTries < 2);
|
||||
|
||||
// Analyze and resubmit until all actions are done successfully or failed after numRetries
|
||||
while (!this.inProgress.isEmpty()) {
|
||||
|
@ -2112,7 +2142,7 @@ public class HConnectionManager {
|
|||
} catch (ExecutionException e) {
|
||||
exception = e;
|
||||
}
|
||||
|
||||
HRegionLocation location = currentTask.getSecond();
|
||||
// Error case: no result at all for this multi action. We need to redo all actions
|
||||
if (responses == null) {
|
||||
for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
|
||||
|
@ -2120,14 +2150,14 @@ public class HConnectionManager {
|
|||
Row row = action.getAction();
|
||||
// Do not use the exception for updating cache because it might be coming from
|
||||
// any of the regions in the MultiAction.
|
||||
hci.updateCachedLocations(tableName, row, null, currentTask.getSecond());
|
||||
hci.updateCachedLocations(tableName, row, null, location);
|
||||
if (noRetry) {
|
||||
errors.add(exception, row, currentTask);
|
||||
errors.add(exception, row, location);
|
||||
} else {
|
||||
if (isTraceEnabled) {
|
||||
retriedErrors.add(exception, row, currentTask);
|
||||
retriedErrors.add(exception, row, location);
|
||||
}
|
||||
lastRetry = addToReplay(nbRetries, action);
|
||||
lastRetry = addToReplay(nbRetries, action, location);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2146,14 +2176,14 @@ public class HConnectionManager {
|
|||
// Failure: retry if it's make sense else update the errors lists
|
||||
if (result == null || result instanceof Throwable) {
|
||||
Row row = correspondingAction.getAction();
|
||||
hci.updateCachedLocations(this.tableName, row, result, currentTask.getSecond());
|
||||
hci.updateCachedLocations(this.tableName, row, result, location);
|
||||
if (result instanceof DoNotRetryIOException || noRetry) {
|
||||
errors.add((Exception)result, row, currentTask);
|
||||
errors.add((Exception)result, row, location);
|
||||
} else {
|
||||
if (isTraceEnabled) {
|
||||
retriedErrors.add((Exception)result, row, currentTask);
|
||||
retriedErrors.add((Exception)result, row, location);
|
||||
}
|
||||
lastRetry = addToReplay(nbRetries, correspondingAction);
|
||||
lastRetry = addToReplay(nbRetries, correspondingAction, location);
|
||||
}
|
||||
} else // success
|
||||
if (callback != null) {
|
||||
|
@ -2186,11 +2216,10 @@ public class HConnectionManager {
|
|||
private List<Row> actions = new ArrayList<Row>();
|
||||
private List<String> addresses = new ArrayList<String>();
|
||||
|
||||
public void add(Exception ex, Row row,
|
||||
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> obj) {
|
||||
public void add(Exception ex, Row row, HRegionLocation location) {
|
||||
exceptions.add(ex);
|
||||
actions.add(row);
|
||||
addresses.add(obj.getSecond().getHostnamePort());
|
||||
addresses.add(location.getHostnamePort());
|
||||
}
|
||||
|
||||
public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
|
||||
|
@ -2219,17 +2248,24 @@ public class HConnectionManager {
|
|||
* Put the action that has to be retried in the Replay list.
|
||||
* @return true if we're out of numRetries and it's the last retry.
|
||||
*/
|
||||
private boolean addToReplay(int[] nbRetries, Action<R> action) {
|
||||
private boolean addToReplay(int[] nbRetries, Action<R> action, HRegionLocation source) {
|
||||
this.toReplay.add(action);
|
||||
nbRetries[action.getOriginalIndex()]++;
|
||||
if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
|
||||
this.curNumRetries = nbRetries[action.getOriginalIndex()];
|
||||
}
|
||||
// numRetries means number of tries, while curNumRetries means current number of retries. So
|
||||
// we need to add 1 to make them comparable. And as we look for the last try we compare
|
||||
// with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want
|
||||
// to initialize it to 1.
|
||||
return ( (this.curNumRetries +1) >= hci.numRetries);
|
||||
if (hci.isUsingServerTrackerForRetries()) {
|
||||
if (this.errorsByServer == null) {
|
||||
this.errorsByServer = hci.createServerErrorTracker();
|
||||
}
|
||||
this.errorsByServer.reportServerError(source);
|
||||
return !this.errorsByServer.canRetryMore();
|
||||
} else {
|
||||
// We need to add 1 to make tries and retries comparable. And as we look for
|
||||
// the last try we compare with '>=' and not '>'. And we need curNumRetries
|
||||
// to means what it says as we don't want to initialize it to 1.
|
||||
return ((this.curNumRetries + 1) >= hci.numTries);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2521,8 +2557,102 @@ public class HConnectionManager {
|
|||
void setRpcEngine(RpcClientEngine engine) {
|
||||
this.rpcEngine = engine;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The record of errors for servers. Visible for testing.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class ServerErrorTracker {
|
||||
private final Map<HRegionLocation, ServerErrors> errorsByServer =
|
||||
new HashMap<HRegionLocation, ServerErrors>();
|
||||
private long canRetryUntil = 0;
|
||||
|
||||
public ServerErrorTracker(long timeout) {
|
||||
LOG.info("Server tracker timeout is " + timeout + "ms");
|
||||
this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
|
||||
}
|
||||
|
||||
boolean canRetryMore() {
|
||||
return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the back-off time for a retrying request to a particular server.
|
||||
* This is here, and package private, for testing (no good way to get at it).
|
||||
* @param server The server in question.
|
||||
* @param basePause The default hci pause.
|
||||
* @return The time to wait before sending next request.
|
||||
*/
|
||||
long calculateBackoffTime(HRegionLocation server, long basePause) {
|
||||
long result = 0;
|
||||
ServerErrors errorStats = errorsByServer.get(server);
|
||||
if (errorStats != null) {
|
||||
result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
|
||||
// Adjust by the time we already waited since last talking to this server.
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
long timeSinceLastError = now - errorStats.getLastErrorTime();
|
||||
if (timeSinceLastError > 0) {
|
||||
result = Math.max(0, result - timeSinceLastError);
|
||||
}
|
||||
// Finally, see if the backoff time overshoots the timeout.
|
||||
if (result > 0 && (now + result > this.canRetryUntil)) {
|
||||
result = Math.max(0, this.canRetryUntil - now);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports that there was an error on the server to do whatever bean-counting necessary.
|
||||
* This is here, and package private, for testing (no good way to get at it).
|
||||
* @param server The server in question.
|
||||
*/
|
||||
void reportServerError(HRegionLocation server) {
|
||||
ServerErrors errors = errorsByServer.get(server);
|
||||
if (errors != null) {
|
||||
errors.addError();
|
||||
} else {
|
||||
errorsByServer.put(server, new ServerErrors());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The record of errors for a server.
|
||||
*/
|
||||
private static class ServerErrors {
|
||||
public long lastErrorTime;
|
||||
public int retries;
|
||||
|
||||
public ServerErrors() {
|
||||
this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.retries = 0;
|
||||
}
|
||||
|
||||
public void addError() {
|
||||
this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
++this.retries;
|
||||
}
|
||||
|
||||
public long getLastErrorTime() {
|
||||
return this.lastErrorTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isUsingServerTrackerForRetries() {
|
||||
return this.useServerTrackerForRetries;
|
||||
}
|
||||
/**
|
||||
* Creates the server error tracker to use inside process.
|
||||
* Currently, to preserve the main assumption about current retries, and to work well with
|
||||
* the retry-limit-based calculation, the calculation is local per Process object.
|
||||
* We may benefit from connection-wide tracking of server errors.
|
||||
* @return ServerErrorTracker to use.
|
||||
*/
|
||||
ServerErrorTracker createServerErrorTracker() {
|
||||
return new ServerErrorTracker(this.serverTrackerTimeout);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Set the number of retries to use serverside when trying to communicate
|
||||
* with another server over {@link HConnection}. Used updating catalog
|
||||
|
|
|
@ -24,6 +24,7 @@ import junit.framework.Assert;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LoadTestTool;
|
||||
|
||||
|
@ -42,9 +43,9 @@ public abstract class IngestIntegrationTestBase {
|
|||
protected HBaseCluster cluster;
|
||||
private LoadTestTool loadTool;
|
||||
|
||||
protected void setUp(int numSlavesBase) throws Exception {
|
||||
protected void setUp(int numSlavesBase, Configuration conf) throws Exception {
|
||||
tableName = this.getClass().getSimpleName();
|
||||
util = new IntegrationTestingUtility();
|
||||
util = (conf == null) ? new IntegrationTestingUtility() : new IntegrationTestingUtility(conf);
|
||||
LOG.info("Initializing cluster with " + numSlavesBase + " servers");
|
||||
util.initializeCluster(numSlavesBase);
|
||||
LOG.info("Done initializing cluster");
|
||||
|
@ -58,6 +59,10 @@ public abstract class IngestIntegrationTestBase {
|
|||
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
|
||||
}
|
||||
|
||||
protected void setUp(int numSlavesBase) throws Exception {
|
||||
setUp(numSlavesBase, null);
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
LOG.info("Restoring the cluster");
|
||||
util.restoreCluster();
|
||||
|
|
|
@ -27,6 +27,8 @@ import junit.framework.Assert;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChaosMonkey;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -100,7 +102,9 @@ public class IntegrationTestRebalanceAndKillServersTargeted extends IngestIntegr
|
|||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void setUp() throws Exception {
|
||||
super.setUp(NUM_SLAVES_BASE);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.set(HConnectionManager.RETRIES_BY_SERVER, "true");
|
||||
super.setUp(NUM_SLAVES_BASE, conf);
|
||||
|
||||
ChaosMonkey.Policy chaosPolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
|
||||
CHAOS_EVERY_MS, new UnbalanceKillAndRebalanceAction());
|
||||
|
|
|
@ -18,10 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
|
@ -42,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -55,6 +53,8 @@ import org.apache.hadoop.hbase.master.HMaster;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -304,13 +304,13 @@ public class TestHCM {
|
|||
|
||||
// Hijack the number of retry to fail immediately instead of retrying: there will be no new
|
||||
// connection to the master
|
||||
Field numRetries = conn.getClass().getDeclaredField("numRetries");
|
||||
numRetries.setAccessible(true);
|
||||
Field numTries = conn.getClass().getDeclaredField("numTries");
|
||||
numTries.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
|
||||
final int prevNumRetriesVal = (Integer)numRetries.get(conn);
|
||||
numRetries.set(conn, 1);
|
||||
modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
|
||||
final int prevNumRetriesVal = (Integer)numTries.get(conn);
|
||||
numTries.set(conn, 1);
|
||||
|
||||
// We do a put and expect the cache to be updated, even if we don't retry
|
||||
LOG.info("Put starting");
|
||||
|
@ -379,7 +379,7 @@ public class TestHCM {
|
|||
"Previous server was "+destServer.getServerName().getHostAndPort(),
|
||||
curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
|
||||
|
||||
numRetries.set(conn, prevNumRetriesVal);
|
||||
numTries.set(conn, prevNumRetriesVal);
|
||||
table.close();
|
||||
}
|
||||
|
||||
|
@ -705,13 +705,13 @@ public class TestHCM {
|
|||
conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
|
||||
|
||||
// Hijack the number of retry to fail after 2 tries
|
||||
Field numRetries = conn.getClass().getDeclaredField("numRetries");
|
||||
numRetries.setAccessible(true);
|
||||
Field numTries = conn.getClass().getDeclaredField("numTries");
|
||||
numTries.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
|
||||
final int prevNumRetriesVal = (Integer)numRetries.get(conn);
|
||||
numRetries.set(conn, 2);
|
||||
modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
|
||||
final int prevNumRetriesVal = (Integer)numTries.get(conn);
|
||||
numTries.set(conn, 2);
|
||||
|
||||
Put put3 = new Put(ROW_X);
|
||||
put3.add(FAM_NAM, ROW_X, ROW_X);
|
||||
|
@ -722,10 +722,83 @@ public class TestHCM {
|
|||
table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
|
||||
// second we get RegionMovedException.
|
||||
|
||||
numRetries.set(conn, prevNumRetriesVal);
|
||||
numTries.set(conn, prevNumRetriesVal);
|
||||
table.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorBackoffTimeCalculation() throws Exception {
|
||||
final long ANY_PAUSE = 1000;
|
||||
HRegionInfo ri = new HRegionInfo(TABLE_NAME);
|
||||
HRegionLocation location = new HRegionLocation(ri, new ServerName("127.0.0.1", 1, 0));
|
||||
HRegionLocation diffLocation = new HRegionLocation(ri, new ServerName("127.0.0.1", 2, 0));
|
||||
|
||||
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(timeMachine);
|
||||
try {
|
||||
long timeBase = timeMachine.currentTimeMillis();
|
||||
long largeAmountOfTime = ANY_PAUSE * 1000;
|
||||
HConnectionImplementation.ServerErrorTracker tracker =
|
||||
new HConnectionImplementation.ServerErrorTracker(largeAmountOfTime);
|
||||
|
||||
// The default backoff is 0.
|
||||
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
|
||||
// Check some backoff values from HConstants sequence.
|
||||
tracker.reportServerError(location);
|
||||
assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
tracker.reportServerError(location);
|
||||
tracker.reportServerError(location);
|
||||
tracker.reportServerError(location);
|
||||
assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
|
||||
// All of this shouldn't affect backoff for different location.
|
||||
|
||||
assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
||||
tracker.reportServerError(diffLocation);
|
||||
assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
||||
|
||||
// But should still work for a different region in the same location.
|
||||
HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2);
|
||||
HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName());
|
||||
assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE));
|
||||
|
||||
// Check with different base.
|
||||
assertEqualsWithJitter(ANY_PAUSE * 4,
|
||||
tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
|
||||
|
||||
// See that time from last error is taken into account. Time shift is applied after jitter,
|
||||
// so pass the original expected backoff as the base for jitter.
|
||||
long timeShift = (long)(ANY_PAUSE * 0.5);
|
||||
timeMachine.setValue(timeBase + timeShift);
|
||||
assertEqualsWithJitter(ANY_PAUSE * 2 - timeShift,
|
||||
tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
|
||||
|
||||
// However we should not go into negative.
|
||||
timeMachine.setValue(timeBase + ANY_PAUSE * 100);
|
||||
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
|
||||
// We also should not go over the boundary; last retry would be on it.
|
||||
long timeLeft = (long)(ANY_PAUSE * 0.5);
|
||||
timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
|
||||
assertTrue(tracker.canRetryMore());
|
||||
tracker.reportServerError(location);
|
||||
assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
timeMachine.setValue(timeBase + largeAmountOfTime);
|
||||
assertFalse(tracker.canRetryMore());
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertEqualsWithJitter(long expected, long actual) {
|
||||
assertEqualsWithJitter(expected, actual, expected);
|
||||
}
|
||||
|
||||
private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
|
||||
assertTrue("Value not within jitter: " + expected + " vs " + actual,
|
||||
Math.abs(actual - expected) <= (0.01f * jitterBase));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue