HBASE-9843 Various fixes in client code
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1536865 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3bebeca24d
commit
53bb2f6be7
|
@ -87,7 +87,9 @@ import org.cloudera.htrace.Trace;
|
||||||
*/
|
*/
|
||||||
class AsyncProcess<CResult> {
|
class AsyncProcess<CResult> {
|
||||||
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
|
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
|
||||||
private final static int START_LOG_ERRORS_CNT = 4;
|
protected static final AtomicLong COUNTER = new AtomicLong();
|
||||||
|
protected final long id;
|
||||||
|
private final int startLogErrorsCnt;
|
||||||
protected final HConnection hConnection;
|
protected final HConnection hConnection;
|
||||||
protected final TableName tableName;
|
protected final TableName tableName;
|
||||||
protected final ExecutorService pool;
|
protected final ExecutorService pool;
|
||||||
|
@ -97,6 +99,7 @@ class AsyncProcess<CResult> {
|
||||||
protected final AtomicBoolean hasError = new AtomicBoolean(false);
|
protected final AtomicBoolean hasError = new AtomicBoolean(false);
|
||||||
protected final AtomicLong tasksSent = new AtomicLong(0);
|
protected final AtomicLong tasksSent = new AtomicLong(0);
|
||||||
protected final AtomicLong tasksDone = new AtomicLong(0);
|
protected final AtomicLong tasksDone = new AtomicLong(0);
|
||||||
|
protected final AtomicLong retriesCnt = new AtomicLong(0);
|
||||||
protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
|
protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
|
||||||
new ConcurrentHashMap<String, AtomicInteger>();
|
new ConcurrentHashMap<String, AtomicInteger>();
|
||||||
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
|
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
|
||||||
|
@ -121,7 +124,6 @@ class AsyncProcess<CResult> {
|
||||||
protected final int maxConcurrentTasksPerServer;
|
protected final int maxConcurrentTasksPerServer;
|
||||||
protected final long pause;
|
protected final long pause;
|
||||||
protected int numTries;
|
protected int numTries;
|
||||||
protected final boolean useServerTrackerForRetries;
|
|
||||||
protected int serverTrackerTimeout;
|
protected int serverTrackerTimeout;
|
||||||
protected RpcRetryingCallerFactory rpcCallerFactory;
|
protected RpcRetryingCallerFactory rpcCallerFactory;
|
||||||
|
|
||||||
|
@ -205,6 +207,8 @@ class AsyncProcess<CResult> {
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
|
|
||||||
|
this.id = COUNTER.incrementAndGet();
|
||||||
|
|
||||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||||
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||||
|
@ -217,6 +221,11 @@ class AsyncProcess<CResult> {
|
||||||
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
|
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
|
||||||
|
|
||||||
|
// A few failure is fine: region moved, then is not opened, then is overloaded. We try
|
||||||
|
// to have an acceptable heuristic for the number of errors we don't log.
|
||||||
|
// 9 was chosen because we wait for 1s at this stage.
|
||||||
|
this.startLogErrorsCnt = conf.getInt("hbase.client.start.log.errors.counter", 9);
|
||||||
|
|
||||||
if (this.maxTotalConcurrentTasks <= 0) {
|
if (this.maxTotalConcurrentTasks <= 0) {
|
||||||
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
||||||
}
|
}
|
||||||
|
@ -229,10 +238,6 @@ class AsyncProcess<CResult> {
|
||||||
maxConcurrentTasksPerRegion);
|
maxConcurrentTasksPerRegion);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.useServerTrackerForRetries =
|
|
||||||
conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
|
|
||||||
|
|
||||||
if (this.useServerTrackerForRetries) {
|
|
||||||
// Server tracker allows us to do faster, and yet useful (hopefully), retries.
|
// Server tracker allows us to do faster, and yet useful (hopefully), retries.
|
||||||
// However, if we are too useful, we might fail very quickly due to retry count limit.
|
// However, if we are too useful, we might fail very quickly due to retry count limit.
|
||||||
// To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
|
// To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
|
||||||
|
@ -244,7 +249,7 @@ class AsyncProcess<CResult> {
|
||||||
for (int i = 0; i < this.numTries; ++i) {
|
for (int i = 0; i < this.numTries; ++i) {
|
||||||
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
|
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
this.rpcCallerFactory = rpcCaller;
|
this.rpcCallerFactory = rpcCaller;
|
||||||
}
|
}
|
||||||
|
@ -291,7 +296,7 @@ class AsyncProcess<CResult> {
|
||||||
Iterator<? extends Row> it = rows.iterator();
|
Iterator<? extends Row> it = rows.iterator();
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
Row r = it.next();
|
Row r = it.next();
|
||||||
HRegionLocation loc = findDestLocation(r, 1, posInList);
|
HRegionLocation loc = findDestLocation(r, posInList);
|
||||||
|
|
||||||
if (loc == null) { // loc is null if there is an error such as meta not available.
|
if (loc == null) { // loc is null if there is an error such as meta not available.
|
||||||
it.remove();
|
it.remove();
|
||||||
|
@ -332,18 +337,17 @@ class AsyncProcess<CResult> {
|
||||||
* Find the destination.
|
* Find the destination.
|
||||||
*
|
*
|
||||||
* @param row the row
|
* @param row the row
|
||||||
* @param numAttempt the num attempt
|
|
||||||
* @param posInList the position in the list
|
* @param posInList the position in the list
|
||||||
* @return the destination. Null if we couldn't find it.
|
* @return the destination. Null if we couldn't find it.
|
||||||
*/
|
*/
|
||||||
private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
|
private HRegionLocation findDestLocation(Row row, int posInList) {
|
||||||
if (row == null) throw new IllegalArgumentException("row cannot be null");
|
if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
|
||||||
HRegionLocation loc = null;
|
HRegionLocation loc = null;
|
||||||
IOException locationException = null;
|
IOException locationException = null;
|
||||||
try {
|
try {
|
||||||
loc = hConnection.locateRegion(this.tableName, row.getRow());
|
loc = hConnection.locateRegion(this.tableName, row.getRow());
|
||||||
if (loc == null) {
|
if (loc == null) {
|
||||||
locationException = new IOException("No location found, aborting submit for" +
|
locationException = new IOException("#" + id + ", no location found, aborting submit for" +
|
||||||
" tableName=" + tableName +
|
" tableName=" + tableName +
|
||||||
" rowkey=" + Arrays.toString(row.getRow()));
|
" rowkey=" + Arrays.toString(row.getRow()));
|
||||||
}
|
}
|
||||||
|
@ -353,7 +357,7 @@ class AsyncProcess<CResult> {
|
||||||
if (locationException != null) {
|
if (locationException != null) {
|
||||||
// There are multiple retries in locateRegion already. No need to add new.
|
// There are multiple retries in locateRegion already. No need to add new.
|
||||||
// We can't continue with this row, hence it's the last retry.
|
// We can't continue with this row, hence it's the last retry.
|
||||||
manageError(numAttempt, posInList, row, false, locationException, null);
|
manageError(posInList, row, false, locationException, null);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,12 +464,17 @@ class AsyncProcess<CResult> {
|
||||||
private void submit(List<Action<Row>> initialActions,
|
private void submit(List<Action<Row>> initialActions,
|
||||||
List<Action<Row>> currentActions, int numAttempt,
|
List<Action<Row>> currentActions, int numAttempt,
|
||||||
final HConnectionManager.ServerErrorTracker errorsByServer) {
|
final HConnectionManager.ServerErrorTracker errorsByServer) {
|
||||||
|
|
||||||
|
if (numAttempt > 1){
|
||||||
|
retriesCnt.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
// group per location => regions server
|
// group per location => regions server
|
||||||
final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
|
final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
|
||||||
new HashMap<HRegionLocation, MultiAction<Row>>();
|
new HashMap<HRegionLocation, MultiAction<Row>>();
|
||||||
|
|
||||||
for (Action<Row> action : currentActions) {
|
for (Action<Row> action : currentActions) {
|
||||||
HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex());
|
HRegionLocation loc = findDestLocation(action.getAction(), action.getOriginalIndex());
|
||||||
if (loc != null) {
|
if (loc != null) {
|
||||||
addAction(loc, action, actionsByServer);
|
addAction(loc, action, actionsByServer);
|
||||||
}
|
}
|
||||||
|
@ -503,7 +512,8 @@ class AsyncProcess<CResult> {
|
||||||
try {
|
try {
|
||||||
res = createCaller(callable).callWithoutRetries(callable);
|
res = createCaller(callable).callWithoutRetries(callable);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +
|
LOG.warn("#" + id + ", call to " + loc.getServerName() +
|
||||||
|
" failed numAttempt=" + numAttempt +
|
||||||
", resubmitting all since not sure where we are at", e);
|
", resubmitting all since not sure where we are at", e);
|
||||||
resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
|
resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
|
||||||
return;
|
return;
|
||||||
|
@ -522,7 +532,7 @@ class AsyncProcess<CResult> {
|
||||||
// This should never happen. But as the pool is provided by the end user, let's secure
|
// This should never happen. But as the pool is provided by the end user, let's secure
|
||||||
// this a little.
|
// this a little.
|
||||||
decTaskCounters(multiAction.getRegions(), loc.getServerName());
|
decTaskCounters(multiAction.getRegions(), loc.getServerName());
|
||||||
LOG.warn("The task was rejected by the pool. This is unexpected." +
|
LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
|
||||||
" Server is " + loc.getServerName(), ree);
|
" Server is " + loc.getServerName(), ree);
|
||||||
// We're likely to fail again, but this will increment the attempt counter, so it will
|
// We're likely to fail again, but this will increment the attempt counter, so it will
|
||||||
// finish.
|
// finish.
|
||||||
|
@ -551,7 +561,6 @@ class AsyncProcess<CResult> {
|
||||||
/**
|
/**
|
||||||
* Check that we can retry acts accordingly: logs, set the error status, call the callbacks.
|
* Check that we can retry acts accordingly: logs, set the error status, call the callbacks.
|
||||||
*
|
*
|
||||||
* @param numAttempt the number of this attempt
|
|
||||||
* @param originalIndex the position in the list sent
|
* @param originalIndex the position in the list sent
|
||||||
* @param row the row
|
* @param row the row
|
||||||
* @param canRetry if false, we won't retry whatever the settings.
|
* @param canRetry if false, we won't retry whatever the settings.
|
||||||
|
@ -559,14 +568,11 @@ class AsyncProcess<CResult> {
|
||||||
* @param location the location, if any (can be null)
|
* @param location the location, if any (can be null)
|
||||||
* @return true if the action can be retried, false otherwise.
|
* @return true if the action can be retried, false otherwise.
|
||||||
*/
|
*/
|
||||||
private boolean manageError(int numAttempt, int originalIndex, Row row, boolean canRetry,
|
private boolean manageError(int originalIndex, Row row, boolean canRetry,
|
||||||
Throwable throwable, HRegionLocation location) {
|
Throwable throwable, HRegionLocation location) {
|
||||||
if (canRetry) {
|
if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
|
||||||
if (numAttempt >= numTries ||
|
|
||||||
(throwable != null && throwable instanceof DoNotRetryIOException)) {
|
|
||||||
canRetry = false;
|
canRetry = false;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
|
byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
|
||||||
|
|
||||||
if (canRetry && callback != null) {
|
if (canRetry && callback != null) {
|
||||||
|
@ -608,15 +614,14 @@ class AsyncProcess<CResult> {
|
||||||
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
|
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
|
||||||
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
|
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
|
||||||
for (Action<Row> action : e.getValue()) {
|
for (Action<Row> action : e.getValue()) {
|
||||||
if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
|
if (manageError(action.getOriginalIndex(), action.getAction(), true, t, location)) {
|
||||||
true, t, location)) {
|
|
||||||
toReplay.add(action);
|
toReplay.add(action);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toReplay.isEmpty()) {
|
if (toReplay.isEmpty()) {
|
||||||
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
|
LOG.warn("#" + id + ", attempt #" + numAttempt + "/" + numTries + " failed for all " +
|
||||||
initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
|
initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
|
||||||
} else {
|
} else {
|
||||||
submit(initialActions, toReplay, numAttempt, errorsByServer);
|
submit(initialActions, toReplay, numAttempt, errorsByServer);
|
||||||
|
@ -628,7 +633,7 @@ class AsyncProcess<CResult> {
|
||||||
*
|
*
|
||||||
* @param initialActions - the whole action list
|
* @param initialActions - the whole action list
|
||||||
* @param rsActions - the actions for this location
|
* @param rsActions - the actions for this location
|
||||||
* @param location - the location
|
* @param location - the location. It's used as a server name.
|
||||||
* @param responses - the response, if any
|
* @param responses - the response, if any
|
||||||
* @param numAttempt - the attempt
|
* @param numAttempt - the attempt
|
||||||
*/
|
*/
|
||||||
|
@ -638,8 +643,8 @@ class AsyncProcess<CResult> {
|
||||||
HConnectionManager.ServerErrorTracker errorsByServer) {
|
HConnectionManager.ServerErrorTracker errorsByServer) {
|
||||||
|
|
||||||
if (responses == null) {
|
if (responses == null) {
|
||||||
LOG.info("Attempt #" + numAttempt + "/" + numTries + " failed all ops, trying resubmit," +
|
LOG.info("#" + id + ", attempt #" + numAttempt + "/" + numTries +
|
||||||
location);
|
" failed all ops, trying resubmit," + location);
|
||||||
resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
|
resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -670,14 +675,15 @@ class AsyncProcess<CResult> {
|
||||||
failureCount++;
|
failureCount++;
|
||||||
if (!regionFailureRegistered) { // We're doing this once per location.
|
if (!regionFailureRegistered) { // We're doing this once per location.
|
||||||
regionFailureRegistered= true;
|
regionFailureRegistered= true;
|
||||||
|
// The location here is used as a server name.
|
||||||
hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
|
hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
|
||||||
if (errorsByServer != null) {
|
if (failureCount == 1) {
|
||||||
errorsByServer.reportServerError(location);
|
errorsByServer.reportServerError(location);
|
||||||
canRetry = errorsByServer.canRetryMore();
|
canRetry = errorsByServer.canRetryMore(numAttempt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (manageError(numAttempt, correspondingAction.getOriginalIndex(), row, canRetry,
|
if (manageError(correspondingAction.getOriginalIndex(), row, canRetry,
|
||||||
throwable, location)) {
|
throwable, location)) {
|
||||||
toReplay.add(correspondingAction);
|
toReplay.add(correspondingAction);
|
||||||
}
|
}
|
||||||
|
@ -694,21 +700,24 @@ class AsyncProcess<CResult> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!toReplay.isEmpty()) {
|
if (!toReplay.isEmpty()) {
|
||||||
long backOffTime = (errorsByServer != null ?
|
// We have two contradicting needs here:
|
||||||
errorsByServer.calculateBackoffTime(location, pause) :
|
// 1) We want to get the new location after having slept, as it may change.
|
||||||
ConnectionUtils.getPauseTime(pause, numAttempt));
|
// 2) We want to take into account the location when calculating the sleep time.
|
||||||
if (numAttempt > START_LOG_ERRORS_CNT && LOG.isDebugEnabled()) {
|
// It should be possible to have some heuristics to take the right decision. Short term,
|
||||||
|
// we go for one.
|
||||||
|
long backOffTime = errorsByServer.calculateBackoffTime(location, pause);
|
||||||
|
if (numAttempt > startLogErrorsCnt) {
|
||||||
// We use this value to have some logs when we have multiple failures, but not too many
|
// We use this value to have some logs when we have multiple failures, but not too many
|
||||||
// logs, as errors are to be expected when a region moves, splits and so on
|
// logs, as errors are to be expected when a region moves, splits and so on
|
||||||
LOG.debug("Attempt #" + numAttempt + "/" + numTries + " failed " + failureCount +
|
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
|
||||||
" ops , resubmitting " + toReplay.size() + ", " + location + ", last exception was: " +
|
location.getServerName(), throwable, backOffTime, true,
|
||||||
(throwable == null ? "null" : throwable.getMessage()) +
|
errorsByServer.getStartTrackingTime()));
|
||||||
", sleeping " + backOffTime + "ms");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(backOffTime);
|
Thread.sleep(backOffTime);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn("Not sent: " + toReplay.size() + " operations, " + location, e);
|
LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + location, e);
|
||||||
Thread.interrupted();
|
Thread.interrupted();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -717,16 +726,46 @@ class AsyncProcess<CResult> {
|
||||||
} else {
|
} else {
|
||||||
if (failureCount != 0) {
|
if (failureCount != 0) {
|
||||||
// We have a failure but nothing to retry. We're done, it's a final failure..
|
// We have a failure but nothing to retry. We're done, it's a final failure..
|
||||||
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount +
|
LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
|
||||||
" ops on " + location.getServerName() + " NOT resubmitting. " + location);
|
location.getServerName(), throwable, -1, false,
|
||||||
} else if (numAttempt > START_LOG_ERRORS_CNT + 1 && LOG.isDebugEnabled()) {
|
errorsByServer.getStartTrackingTime()));
|
||||||
|
} else if (numAttempt > startLogErrorsCnt + 1) {
|
||||||
// The operation was successful, but needed several attempts. Let's log this.
|
// The operation was successful, but needed several attempts. Let's log this.
|
||||||
LOG.debug("Attempt #" + numAttempt + "/" + numTries + " finally suceeded, size=" +
|
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
|
||||||
toReplay.size());
|
location.getServerName(), throwable, -1, false,
|
||||||
|
errorsByServer.getStartTrackingTime()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
|
||||||
|
Throwable error, long backOffTime, boolean willRetry, String startTime){
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
|
||||||
|
sb.append("#").append(id).append(", table=").append(tableName).
|
||||||
|
append(", Attempt #").append(numAttempt).append("/").append(numTries).append(" ");
|
||||||
|
|
||||||
|
if (failureCount > 0 || error != null){
|
||||||
|
sb.append("failed ").append(failureCount).append(" ops").append(", last exception was: ").
|
||||||
|
append(error == null ? "null" : error.getMessage());
|
||||||
|
}else {
|
||||||
|
sb.append("SUCCEEDED");
|
||||||
|
}
|
||||||
|
|
||||||
|
sb.append(" on server ").append(sn);
|
||||||
|
|
||||||
|
sb.append(", tracking started at ").append(startTime);
|
||||||
|
|
||||||
|
if (willRetry) {
|
||||||
|
sb.append(" - retrying after sleeping for ").append(backOffTime).append(" ms").
|
||||||
|
append(", will replay ").append(replaySize).append(" ops.");
|
||||||
|
} else if (failureCount > 0) {
|
||||||
|
sb.append(" - FAILED, NOT RETRYING ANYMORE");
|
||||||
|
}
|
||||||
|
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for another task to finish.
|
* Waits for another task to finish.
|
||||||
* @param currentNumberOfTask - the number of task finished when calling the method.
|
* @param currentNumberOfTask - the number of task finished when calling the method.
|
||||||
|
@ -738,7 +777,7 @@ class AsyncProcess<CResult> {
|
||||||
this.tasksDone.wait(100);
|
this.tasksDone.wait(100);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new InterruptedIOException("Interrupted." +
|
throw new InterruptedIOException("#" + id + ", interrupted." +
|
||||||
" currentNumberOfTask=" + currentNumberOfTask +
|
" currentNumberOfTask=" + currentNumberOfTask +
|
||||||
", tableName=" + tableName + ", tasksDone=" + tasksDone.get());
|
", tableName=" + tableName + ", tasksDone=" + tasksDone.get());
|
||||||
}
|
}
|
||||||
|
@ -756,9 +795,10 @@ class AsyncProcess<CResult> {
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
if (now > lastLog + 10000) {
|
if (now > lastLog + 10000) {
|
||||||
lastLog = now;
|
lastLog = now;
|
||||||
LOG.info(": Waiting for the global number of running tasks to be equals or less than "
|
LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
|
||||||
+ max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
|
+ max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
|
||||||
", currentTasksDone=" + currentTasksDone + ", tableName=" + tableName);
|
", currentTasksDone=" + currentTasksDone + ", retries=" + retriesCnt.get() +
|
||||||
|
" hasError=" + hasError() + ", tableName=" + tableName);
|
||||||
}
|
}
|
||||||
waitForNextTaskDone(currentTasksDone);
|
waitForNextTaskDone(currentTasksDone);
|
||||||
currentTasksDone = this.tasksDone.get();
|
currentTasksDone = this.tasksDone.get();
|
||||||
|
@ -848,10 +888,6 @@ class AsyncProcess<CResult> {
|
||||||
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
|
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
|
||||||
*/
|
*/
|
||||||
protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
|
protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
|
||||||
if (useServerTrackerForRetries){
|
return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
|
||||||
return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout);
|
|
||||||
}else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
@ -2638,15 +2639,23 @@ public class HConnectionManager {
|
||||||
// We need a concurrent map here, as we could have multiple threads updating it in parallel.
|
// We need a concurrent map here, as we could have multiple threads updating it in parallel.
|
||||||
private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
|
private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
|
||||||
new ConcurrentHashMap<HRegionLocation, ServerErrors>();
|
new ConcurrentHashMap<HRegionLocation, ServerErrors>();
|
||||||
private long canRetryUntil = 0;
|
private final long canRetryUntil;
|
||||||
|
private final int maxRetries;
|
||||||
|
private final String startTrackingTime;
|
||||||
|
|
||||||
public ServerErrorTracker(long timeout) {
|
public ServerErrorTracker(long timeout, int maxRetries) {
|
||||||
LOG.trace("Server tracker timeout is " + timeout + "ms");
|
this.maxRetries = maxRetries;
|
||||||
this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
|
this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
|
||||||
|
this.startTrackingTime = new Date().toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean canRetryMore() {
|
/**
|
||||||
return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
|
* We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
|
||||||
|
*/
|
||||||
|
boolean canRetryMore(int numRetry) {
|
||||||
|
// If there is a single try we must not take into account the time.
|
||||||
|
return numRetry < maxRetries || (maxRetries > 1 &&
|
||||||
|
EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2657,20 +2666,12 @@ public class HConnectionManager {
|
||||||
* @return The time to wait before sending next request.
|
* @return The time to wait before sending next request.
|
||||||
*/
|
*/
|
||||||
long calculateBackoffTime(HRegionLocation server, long basePause) {
|
long calculateBackoffTime(HRegionLocation server, long basePause) {
|
||||||
long result = 0;
|
long result;
|
||||||
ServerErrors errorStats = errorsByServer.get(server);
|
ServerErrors errorStats = errorsByServer.get(server);
|
||||||
if (errorStats != null) {
|
if (errorStats != null) {
|
||||||
result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
|
result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
|
||||||
// Adjust by the time we already waited since last talking to this server.
|
} else {
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
result = 0; // yes, if the server is not in our list we don't wait before retrying.
|
||||||
long timeSinceLastError = now - errorStats.getLastErrorTime();
|
|
||||||
if (timeSinceLastError > 0) {
|
|
||||||
result = Math.max(0, result - timeSinceLastError);
|
|
||||||
}
|
|
||||||
// Finally, see if the backoff time overshoots the timeout.
|
|
||||||
if (result > 0 && (now + result > this.canRetryUntil)) {
|
|
||||||
result = Math.max(0, this.canRetryUntil - now);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -2685,29 +2686,25 @@ public class HConnectionManager {
|
||||||
if (errors != null) {
|
if (errors != null) {
|
||||||
errors.addError();
|
errors.addError();
|
||||||
} else {
|
} else {
|
||||||
errorsByServer.put(server, new ServerErrors());
|
errors = errorsByServer.putIfAbsent(server, new ServerErrors());
|
||||||
|
if (errors != null){
|
||||||
|
errors.addError();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String getStartTrackingTime() {
|
||||||
|
return startTrackingTime;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The record of errors for a server.
|
* The record of errors for a server.
|
||||||
*/
|
*/
|
||||||
private static class ServerErrors {
|
private static class ServerErrors {
|
||||||
public long lastErrorTime;
|
public final AtomicInteger retries = new AtomicInteger(0);
|
||||||
public int retries;
|
|
||||||
|
|
||||||
public ServerErrors() {
|
|
||||||
this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
|
|
||||||
this.retries = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addError() {
|
public void addError() {
|
||||||
this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
|
retries.incrementAndGet();
|
||||||
++this.retries;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getLastErrorTime() {
|
|
||||||
return this.lastErrorTime;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -679,7 +679,7 @@ public class TestAsyncProcess {
|
||||||
HTable ht = new HTable();
|
HTable ht = new HTable();
|
||||||
Configuration configuration = new Configuration(conf);
|
Configuration configuration = new Configuration(conf);
|
||||||
configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
|
configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
|
||||||
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
|
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||||
// set default writeBufferSize
|
// set default writeBufferSize
|
||||||
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
|
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
|
||||||
|
|
||||||
|
@ -688,24 +688,21 @@ public class TestAsyncProcess {
|
||||||
ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
|
ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
|
||||||
|
|
||||||
|
|
||||||
Assert.assertTrue(ht.ap.useServerTrackerForRetries);
|
|
||||||
Assert.assertNotNull(ht.ap.createServerErrorTracker());
|
Assert.assertNotNull(ht.ap.createServerErrorTracker());
|
||||||
Assert.assertTrue(ht.ap.serverTrackerTimeout > 10000);
|
Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
|
||||||
ht.ap.serverTrackerTimeout = 1;
|
ht.ap.serverTrackerTimeout = 1;
|
||||||
|
|
||||||
|
|
||||||
Put p = createPut(1, false);
|
Put p = createPut(1, false);
|
||||||
ht.setAutoFlush(false, false);
|
ht.setAutoFlush(false, false);
|
||||||
ht.put(p);
|
ht.put(p);
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
try {
|
try {
|
||||||
ht.flushCommits();
|
ht.flushCommits();
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
} catch (RetriesExhaustedWithDetailsException expected) {
|
} catch (RetriesExhaustedWithDetailsException expected) {
|
||||||
}
|
}
|
||||||
// Checking that the ErrorsServers came into play and made us stop immediately
|
// Checking that the ErrorsServers came into play and didn't make us stop immediately
|
||||||
Assert.assertTrue((System.currentTimeMillis() - start) < 10000);
|
Assert.assertEquals(ht.ap.tasksSent.get(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -733,7 +730,6 @@ public class TestAsyncProcess {
|
||||||
|
|
||||||
ht.batch(gets);
|
ht.batch(gets);
|
||||||
|
|
||||||
|
|
||||||
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
|
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
|
||||||
Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
|
Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
|
||||||
Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
|
Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
|
||||||
|
|
|
@ -512,9 +512,10 @@ public final class HConstants {
|
||||||
* run out of array items. Retries beyond this use the last number in the array. So, for
|
* run out of array items. Retries beyond this use the last number in the array. So, for
|
||||||
* example, if hbase.client.pause is 1 second, and maximum retries count
|
* example, if hbase.client.pause is 1 second, and maximum retries count
|
||||||
* hbase.client.retries.number is 10, we will retry at the following intervals:
|
* hbase.client.retries.number is 10, we will retry at the following intervals:
|
||||||
* 1, 2, 3, 10, 100, 100, 100, 100, 100, 100.
|
* 1, 2, 3, 5, 10, 20, 40, 100, 100, 100.
|
||||||
|
* With 100ms, a back-off of 200 means 20s
|
||||||
*/
|
*/
|
||||||
public static int RETRY_BACKOFF[] = { 1, 2, 3, 5, 10, 100 };
|
public static int RETRY_BACKOFF[] = { 1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200 };
|
||||||
|
|
||||||
public static final String REGION_IMPL = "hbase.hregion.impl";
|
public static final String REGION_IMPL = "hbase.hregion.impl";
|
||||||
|
|
||||||
|
@ -591,7 +592,7 @@ public final class HConstants {
|
||||||
/**
|
/**
|
||||||
* Default value of {@link #HBASE_CLIENT_MAX_PERSERVER_TASKS}.
|
* Default value of {@link #HBASE_CLIENT_MAX_PERSERVER_TASKS}.
|
||||||
*/
|
*/
|
||||||
public static final int DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS = 5;
|
public static final int DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS = 2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of concurrent connections the client will maintain to a single
|
* The maximum number of concurrent connections the client will maintain to a single
|
||||||
|
|
|
@ -93,13 +93,13 @@ public class ClusterStatusPublisher extends Chore {
|
||||||
* We want to limit the size of the protobuf message sent, do fit into a single packet.
|
* We want to limit the size of the protobuf message sent, do fit into a single packet.
|
||||||
* a reasonable size for ip / ethernet is less than 1Kb.
|
* a reasonable size for ip / ethernet is less than 1Kb.
|
||||||
*/
|
*/
|
||||||
public static int MAX_SERVER_PER_MESSAGE = 10;
|
public final static int MAX_SERVER_PER_MESSAGE = 10;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If a server dies, we're sending the information multiple times in case a receiver misses the
|
* If a server dies, we're sending the information multiple times in case a receiver misses the
|
||||||
* message.
|
* message.
|
||||||
*/
|
*/
|
||||||
public static int NB_SEND = 5;
|
public final static int NB_SEND = 5;
|
||||||
|
|
||||||
public ClusterStatusPublisher(HMaster master, Configuration conf,
|
public ClusterStatusPublisher(HMaster master, Configuration conf,
|
||||||
Class<? extends Publisher> publisherClass)
|
Class<? extends Publisher> publisherClass)
|
||||||
|
|
|
@ -2534,7 +2534,13 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
if (this.memstoreSize.get() > this.blockingMemStoreSize) {
|
if (this.memstoreSize.get() > this.blockingMemStoreSize) {
|
||||||
requestFlush();
|
requestFlush();
|
||||||
throw new RegionTooBusyException("above memstore limit");
|
throw new RegionTooBusyException("Above memstore limit, " +
|
||||||
|
"regionName=" + (this.getRegionInfo() == null ? "unknown" :
|
||||||
|
this.getRegionInfo().getRegionNameAsString()) +
|
||||||
|
", server=" + (this.getRegionServerServices() == null ? "unknown" :
|
||||||
|
this.getRegionServerServices().getServerName()) +
|
||||||
|
", memstoreSize=" + memstoreSize.get() +
|
||||||
|
", blockingMemStoreSize=" + blockingMemStoreSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5358,7 +5364,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
|
busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
|
||||||
if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
|
if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
|
||||||
throw new RegionTooBusyException(
|
throw new RegionTooBusyException(
|
||||||
"failed to get a lock in " + waitTime + "ms");
|
"failed to get a lock in " + waitTime + " ms. " +
|
||||||
|
"regionName=" + (this.getRegionInfo() == null ? "unknown" :
|
||||||
|
this.getRegionInfo().getRegionNameAsString()) +
|
||||||
|
", server=" + (this.getRegionServerServices() == null ? "unknown" :
|
||||||
|
this.getRegionServerServices().getServerName()));
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.info("Interrupted while waiting for a lock");
|
LOG.info("Interrupted while waiting for a lock");
|
||||||
|
|
|
@ -870,7 +870,7 @@ public class TestHCM {
|
||||||
long timeBase = timeMachine.currentTimeMillis();
|
long timeBase = timeMachine.currentTimeMillis();
|
||||||
long largeAmountOfTime = ANY_PAUSE * 1000;
|
long largeAmountOfTime = ANY_PAUSE * 1000;
|
||||||
HConnectionManager.ServerErrorTracker tracker =
|
HConnectionManager.ServerErrorTracker tracker =
|
||||||
new HConnectionManager.ServerErrorTracker(largeAmountOfTime);
|
new HConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
|
||||||
|
|
||||||
// The default backoff is 0.
|
// The default backoff is 0.
|
||||||
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||||
|
@ -912,11 +912,11 @@ public class TestHCM {
|
||||||
// We also should not go over the boundary; last retry would be on it.
|
// We also should not go over the boundary; last retry would be on it.
|
||||||
long timeLeft = (long)(ANY_PAUSE * 0.5);
|
long timeLeft = (long)(ANY_PAUSE * 0.5);
|
||||||
timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
|
timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
|
||||||
assertTrue(tracker.canRetryMore());
|
assertTrue(tracker.canRetryMore(1));
|
||||||
tracker.reportServerError(location);
|
tracker.reportServerError(location);
|
||||||
assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||||
timeMachine.setValue(timeBase + largeAmountOfTime);
|
timeMachine.setValue(timeBase + largeAmountOfTime);
|
||||||
assertFalse(tracker.canRetryMore());
|
assertFalse(tracker.canRetryMore(1));
|
||||||
} finally {
|
} finally {
|
||||||
EnvironmentEdgeManager.reset();
|
EnvironmentEdgeManager.reset();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue