HBASE-9862 manage error per server and per region in the protobuffed client
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1537431 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e828b5eb69
commit
7f9e701bb6
|
@ -512,14 +512,23 @@ class AsyncProcess<CResult> {
|
|||
try {
|
||||
res = createCaller(callable).callWithoutRetries(callable);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("#" + id + ", call to " + loc.getServerName() +
|
||||
" failed numAttempt=" + numAttempt +
|
||||
", resubmitting all since not sure where we are at", e);
|
||||
resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
|
||||
// The service itself failed . It may be an error coming from the communication
|
||||
// layer, but, as well, a functional error raised by the server.
|
||||
receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e,
|
||||
errorsByServer);
|
||||
return;
|
||||
} catch (Throwable t) {
|
||||
// This should not happen. Let's log & retry anyway.
|
||||
LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
|
||||
" Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t);
|
||||
receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t,
|
||||
errorsByServer);
|
||||
return;
|
||||
}
|
||||
|
||||
// Nominal case: we received an answer from the server, and it's not an exception.
|
||||
receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
|
||||
|
||||
} finally {
|
||||
decTaskCounters(multiAction.getRegions(), loc.getServerName());
|
||||
}
|
||||
|
@ -536,7 +545,7 @@ class AsyncProcess<CResult> {
|
|||
" Server is " + loc.getServerName(), ree);
|
||||
// We're likely to fail again, but this will increment the attempt counter, so it will
|
||||
// finish.
|
||||
resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);
|
||||
receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -603,14 +612,15 @@ class AsyncProcess<CResult> {
|
|||
* @param numAttempt the number of attempts so far
|
||||
* @param t the throwable (if any) that caused the resubmit
|
||||
*/
|
||||
private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
|
||||
HRegionLocation location, int numAttempt, Throwable t,
|
||||
HConnectionManager.ServerErrorTracker errorsByServer) {
|
||||
private void receiveGlobalFailure(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
|
||||
HRegionLocation location, int numAttempt, Throwable t,
|
||||
HConnectionManager.ServerErrorTracker errorsByServer) {
|
||||
// Do not use the exception for updating cache because it might be coming from
|
||||
// any of the regions in the MultiAction.
|
||||
hConnection.updateCachedLocations(tableName,
|
||||
rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
|
||||
errorsByServer.reportServerError(location);
|
||||
|
||||
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
|
||||
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
|
||||
for (Action<Row> action : e.getValue()) {
|
||||
|
@ -620,34 +630,77 @@ class AsyncProcess<CResult> {
|
|||
}
|
||||
}
|
||||
|
||||
logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(),
|
||||
t, errorsByServer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Log as many info as possible, and, if there is something to replay, submit it again after
|
||||
* a back off sleep.
|
||||
*/
|
||||
private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
|
||||
List<Action<Row>> toReplay, int numAttempt, int failureCount,
|
||||
Throwable throwable,
|
||||
HConnectionManager.ServerErrorTracker errorsByServer){
|
||||
|
||||
if (toReplay.isEmpty()) {
|
||||
LOG.warn("#" + id + ", attempt #" + numAttempt + "/" + numTries + " failed for all " +
|
||||
initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
|
||||
} else {
|
||||
submit(initialActions, toReplay, numAttempt, errorsByServer);
|
||||
// it's either a success or a last failure
|
||||
if (failureCount != 0) {
|
||||
// We have a failure but nothing to retry. We're done, it's a final failure..
|
||||
LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
|
||||
oldLocation.getServerName(), throwable, -1, false,
|
||||
errorsByServer.getStartTrackingTime()));
|
||||
} else if (numAttempt > startLogErrorsCnt + 1) {
|
||||
// The operation was successful, but needed several attempts. Let's log this.
|
||||
LOG.info(createLog(numAttempt, failureCount, 0,
|
||||
oldLocation.getServerName(), throwable, -1, false,
|
||||
errorsByServer.getStartTrackingTime()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// We have something to replay. We're going to sleep a little before.
|
||||
|
||||
// We have two contradicting needs here:
|
||||
// 1) We want to get the new location after having slept, as it may change.
|
||||
// 2) We want to take into account the location when calculating the sleep time.
|
||||
// It should be possible to have some heuristics to take the right decision. Short term,
|
||||
// we go for one.
|
||||
long backOffTime = errorsByServer.calculateBackoffTime(oldLocation, pause);
|
||||
|
||||
if (numAttempt > startLogErrorsCnt) {
|
||||
// We use this value to have some logs when we have multiple failures, but not too many
|
||||
// logs, as errors are to be expected when a region moves, splits and so on
|
||||
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
|
||||
oldLocation.getServerName(), throwable, backOffTime, true,
|
||||
errorsByServer.getStartTrackingTime()));
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(backOffTime);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldLocation, e);
|
||||
Thread.interrupted();
|
||||
return;
|
||||
}
|
||||
|
||||
submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when we receive the result of a server query.
|
||||
*
|
||||
* @param initialActions - the whole action list
|
||||
* @param rsActions - the actions for this location
|
||||
* @param multiAction - the multiAction we sent
|
||||
* @param location - the location. It's used as a server name.
|
||||
* @param responses - the response, if any
|
||||
* @param numAttempt - the attempt
|
||||
*/
|
||||
private void receiveMultiAction(List<Action<Row>> initialActions,
|
||||
MultiAction<Row> rsActions, HRegionLocation location,
|
||||
private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction,
|
||||
HRegionLocation location,
|
||||
MultiResponse responses, int numAttempt,
|
||||
HConnectionManager.ServerErrorTracker errorsByServer) {
|
||||
|
||||
if (responses == null) {
|
||||
LOG.info("#" + id + ", attempt #" + numAttempt + "/" + numTries +
|
||||
" failed all ops, trying resubmit," + location);
|
||||
resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
|
||||
return;
|
||||
}
|
||||
assert responses != null;
|
||||
|
||||
// Success or partial success
|
||||
// Analyze detailed results. We can still have individual failures to be redo.
|
||||
|
@ -657,9 +710,9 @@ class AsyncProcess<CResult> {
|
|||
|
||||
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
|
||||
Throwable throwable = null;
|
||||
|
||||
int failureCount = 0;
|
||||
boolean canRetry = true;
|
||||
|
||||
for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
|
||||
responses.getResults().entrySet()) {
|
||||
|
||||
|
@ -699,43 +752,36 @@ class AsyncProcess<CResult> {
|
|||
}
|
||||
}
|
||||
|
||||
if (!toReplay.isEmpty()) {
|
||||
// We have two contradicting needs here:
|
||||
// 1) We want to get the new location after having slept, as it may change.
|
||||
// 2) We want to take into account the location when calculating the sleep time.
|
||||
// It should be possible to have some heuristics to take the right decision. Short term,
|
||||
// we go for one.
|
||||
long backOffTime = errorsByServer.calculateBackoffTime(location, pause);
|
||||
if (numAttempt > startLogErrorsCnt) {
|
||||
// We use this value to have some logs when we have multiple failures, but not too many
|
||||
// logs, as errors are to be expected when a region moves, splits and so on
|
||||
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
|
||||
location.getServerName(), throwable, backOffTime, true,
|
||||
errorsByServer.getStartTrackingTime()));
|
||||
// The failures global to a region. We will use for multiAction we sent previously to find the
|
||||
// actions to replay.
|
||||
|
||||
for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
|
||||
throwable = throwableEntry.getValue();
|
||||
byte[] region =throwableEntry.getKey();
|
||||
List<Action<Row>> actions = multiAction.actions.get(region);
|
||||
if (actions == null || actions.isEmpty()) {
|
||||
throw new IllegalStateException("Wrong response for the region: " +
|
||||
HRegionInfo.encodeRegionName(region));
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(backOffTime);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + location, e);
|
||||
Thread.interrupted();
|
||||
return;
|
||||
if (failureCount == 0) {
|
||||
errorsByServer.reportServerError(location);
|
||||
canRetry = errorsByServer.canRetryMore(numAttempt);
|
||||
}
|
||||
hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(),
|
||||
throwable, location);
|
||||
failureCount += actions.size();
|
||||
|
||||
submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
|
||||
} else {
|
||||
if (failureCount != 0) {
|
||||
// We have a failure but nothing to retry. We're done, it's a final failure..
|
||||
LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
|
||||
location.getServerName(), throwable, -1, false,
|
||||
errorsByServer.getStartTrackingTime()));
|
||||
} else if (numAttempt > startLogErrorsCnt + 1) {
|
||||
// The operation was successful, but needed several attempts. Let's log this.
|
||||
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
|
||||
location.getServerName(), throwable, -1, false,
|
||||
errorsByServer.getStartTrackingTime()));
|
||||
for (Action<Row> action : actions) {
|
||||
Row row = action.getAction();
|
||||
if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) {
|
||||
toReplay.add(action);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount,
|
||||
throwable, errorsByServer);
|
||||
}
|
||||
|
||||
private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
|
||||
|
|
|
@ -40,6 +40,13 @@ public class MultiResponse {
|
|||
private Map<byte[], List<Pair<Integer, Object>>> results =
|
||||
new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/**
|
||||
* The server can send us a failure for the region itself, instead of individual failure.
|
||||
* It's a part of the protobuf definition.
|
||||
*/
|
||||
private Map<byte[], Throwable> exceptions =
|
||||
new TreeMap<byte[], Throwable>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
public MultiResponse() {
|
||||
super();
|
||||
}
|
||||
|
@ -80,4 +87,19 @@ public class MultiResponse {
|
|||
public Map<byte[], List<Pair<Integer, Object>>> getResults() {
|
||||
return results;
|
||||
}
|
||||
|
||||
public void addException(byte []regionName, Throwable ie){
|
||||
exceptions.put(regionName, ie);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the exception for the region, if any. Null otherwise.
|
||||
*/
|
||||
public Throwable getException(byte []regionName){
|
||||
return exceptions.get(regionName);
|
||||
}
|
||||
|
||||
public Map<byte[], Throwable> getExceptions() {
|
||||
return exceptions;
|
||||
}
|
||||
}
|
|
@ -91,28 +91,12 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
|
|||
try {
|
||||
responseProto = getStub().multi(controller, requestProto);
|
||||
} catch (ServiceException e) {
|
||||
return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
|
||||
throw ProtobufUtil.getRemoteException(e);
|
||||
}
|
||||
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param request
|
||||
* @param t
|
||||
* @return Return a response that has every action in request failed w/ the passed in
|
||||
* exception <code>t</code> -- this will get them all retried after some backoff.
|
||||
*/
|
||||
private static MultiResponse createAllFailedResponse(final ClientProtos.MultiRequest request,
|
||||
final Throwable t) {
|
||||
MultiResponse massFailedResponse = new MultiResponse();
|
||||
for (RegionAction rAction: request.getRegionActionList()) {
|
||||
byte [] regionName = rAction.getRegion().getValue().toByteArray();
|
||||
for (ClientProtos.Action action: rAction.getActionList()) {
|
||||
massFailedResponse.add(regionName, new Pair<Integer, Object>(action.getIndex(), t));
|
||||
}
|
||||
}
|
||||
return massFailedResponse;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return True if we should send data in cellblocks. This is an expensive call. Cache the
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
|
||||
|
@ -94,13 +95,17 @@ public final class ResponseConverter {
|
|||
for (int i = 0; i < responseRegionActionResultCount; i++) {
|
||||
RegionAction actions = request.getRegionAction(i);
|
||||
RegionActionResult actionResult = response.getRegionActionResult(i);
|
||||
byte[] regionName = actions.getRegion().toByteArray();
|
||||
HBaseProtos.RegionSpecifier rs = actions.getRegion();
|
||||
if (rs.hasType() &&
|
||||
(rs.getType() != HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)){
|
||||
throw new IllegalArgumentException(
|
||||
"We support only encoded types for protobuf multi response.");
|
||||
}
|
||||
byte[] regionName = rs.getValue().toByteArray();
|
||||
|
||||
if (actionResult.hasException()){
|
||||
Throwable regionException = ProtobufUtil.toException(actionResult.getException());
|
||||
for (ClientProtos.Action a : actions.getActionList()){
|
||||
results.add(regionName, new Pair<Integer, Object>(a.getIndex(), regionException));
|
||||
}
|
||||
results.addException(regionName, regionException);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue