HBASE-10794 multi-get should handle replica location missing from cache

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1586468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2014-04-10 20:57:59 +00:00 committed by Enis Soztutar
parent 61bce90362
commit 579f305bd0
7 changed files with 273 additions and 171 deletions

View File

@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -346,7 +347,15 @@ class AsyncProcess {
Row r = it.next(); Row r = it.next();
HRegionLocation loc; HRegionLocation loc;
try { try {
loc = findDestLocation(tableName, r, true).getDefaultRegionLocation(); if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
// Make sure we get 0-s replica.
RegionLocations locs = hConnection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
throw new IOException("#" + id + ", no location found, aborting submit for" +
" tableName=" + tableName + " rowkey=" + Arrays.toString(r.getRow()));
}
loc = locs.getDefaultRegionLocation();
} catch (IOException ex) { } catch (IOException ex) {
locationErrors = new ArrayList<Exception>(); locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>(); locationErrorRows = new ArrayList<Integer>();
@ -381,10 +390,11 @@ class AsyncProcess {
for (int i = 0; i < locationErrors.size(); ++i) { for (int i = 0; i < locationErrors.size(); ++i) {
int originalIndex = locationErrorRows.get(i); int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction(); Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row, false, locationErrors.get(i), null); ars.manageError(originalIndex, row,
Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
} }
} }
ars.sendMultiAction(actionsByServer, 1, null); ars.sendMultiAction(actionsByServer, 1, null, false);
return ars; return ars;
} }
@ -410,24 +420,6 @@ class AsyncProcess {
multiAction.add(regionName, action); multiAction.add(regionName, action);
} }
/**
* Find the destination.
* @param tableName the requisite table.
* @param row the row
* @return the destination.
*/
private RegionLocations findDestLocation(
TableName tableName, Row row, boolean checkPrimary) throws IOException {
if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow());
if (loc == null
|| (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null))) {
throw new IOException("#" + id + ", no location found, aborting submit for" +
" tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow()));
}
return loc;
}
/** /**
* Check if we should send new operations to this region or region server. * Check if we should send new operations to this region or region server.
* We're taking into account the past decision; if we have already accepted * We're taking into account the past decision; if we have already accepted
@ -585,17 +577,29 @@ class AsyncProcess {
if (done) return; // Done within primary timeout if (done) return; // Done within primary timeout
Map<ServerName, MultiAction<Row>> actionsByServer = Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>(); new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
if (replicaGetIndices == null) { if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) { for (int i = 0; i < results.length; ++i) {
addReplicaActions(i, actionsByServer); addReplicaActions(i, actionsByServer, unknownLocActions);
} }
} else { } else {
for (int i = 0; i < replicaGetIndices.length; ++i) { for (int i = 0; i < replicaGetIndices.length; ++i) {
addReplicaActions(replicaGetIndices[i], actionsByServer); addReplicaActions(replicaGetIndices[i], actionsByServer, unknownLocActions);
}
}
if (!actionsByServer.isEmpty()) {
sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
}
if (!unknownLocActions.isEmpty()) {
actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
for (Action<Row> action : unknownLocActions) {
addReplicaActionsAgain(action, actionsByServer);
}
// Some actions may have completely failed, they are handled inside addAgain.
if (!actionsByServer.isEmpty()) {
sendMultiAction(actionsByServer, 1, null, true);
} }
} }
if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found.
sendMultiAction(actionsByServer, 1, null);
} }
/** /**
@ -603,33 +607,14 @@ class AsyncProcess {
* @param index Index of the original action. * @param index Index of the original action.
* @param actionsByServer The map by server to add it to. * @param actionsByServer The map by server to add it to.
*/ */
private void addReplicaActions( private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
int index, Map<ServerName, MultiAction<Row>> actionsByServer) { List<Action<Row>> unknownReplicaActions) {
if (results[index] != null) return; // opportunistic. Never goes from non-null to null. if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
Action<Row> action = initialActions.get(index); Action<Row> action = initialActions.get(index);
RegionLocations loc = null; RegionLocations loc = findAllLocationsOrFail(action, true);
try { if (loc == null) return;
// For perf, we assume that this location coming from cache, since we just got location
// from meta for the primary call. If it turns out to not be the case, we'd need local
// cache since we want to keep as little time as possible before replica call.
loc = findDestLocation(tableName, action.getAction(), false);
} catch (IOException ex) {
manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
LOG.error("Cannot get location - no replica calls for some actions", ex);
return;
}
HRegionLocation[] locs = loc.getRegionLocations(); HRegionLocation[] locs = loc.getRegionLocations();
int replicaCount = 0; if (locs.length == 1) {
for (int i = 1; i < locs.length; ++i) {
replicaCount += (locs[i] != null) ? 1 : 0;
}
if (replicaCount == 0) {
// we could have got the replica back (if the server went down and the replica moved)
try {
loc = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true);
} catch (IOException e) {
manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
}
LOG.warn("No replicas found for " + action.getAction()); LOG.warn("No replicas found for " + action.getAction());
return; return;
} }
@ -639,14 +624,30 @@ class AsyncProcess {
// but that would require additional synchronization w.r.t. returning to caller. // but that would require additional synchronization w.r.t. returning to caller.
if (results[index] != null) return; if (results[index] != null) return;
// We set the number of calls here. After that any path must call setResult/setError. // We set the number of calls here. After that any path must call setResult/setError.
results[index] = new ReplicaResultState(replicaCount + 1); // True even for replicas that are not found - if we refuse to send we MUST set error.
results[index] = new ReplicaResultState(locs.length);
} }
for (int i = 1; i < locs.length; ++i) { for (int i = 1; i < locs.length; ++i) {
if (locs[i] == null) continue; Action<Row> replicaAction = new Action<Row>(action, i);
addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), if (locs[i] != null) {
new Action<Row>(action, i), actionsByServer, nonceGroup); addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
replicaAction, actionsByServer, nonceGroup);
} else {
unknownReplicaActions.add(replicaAction);
}
} }
} }
private void addReplicaActionsAgain(
Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
throw new AssertionError("Cannot have default replica here");
}
HRegionLocation loc = getReplicaLocationOrFail(action);
if (loc == null) return;
addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
action, actionsByServer, nonceGroup);
}
} }
/** /**
@ -797,22 +798,14 @@ class AsyncProcess {
* @param numAttempt - the current numAttempt (first attempt is 1) * @param numAttempt - the current numAttempt (first attempt is 1)
*/ */
private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) { private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
// group per location => regions server Map<ServerName, MultiAction<Row>> actionsByServer =
final Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>(); new HashMap<ServerName, MultiAction<Row>>();
boolean isReplica = false; boolean isReplica = false;
List<Action<Row>> unknownReplicaActions = null;
for (Action<Row> action : currentActions) { for (Action<Row> action : currentActions) {
RegionLocations locs = null; RegionLocations locs = findAllLocationsOrFail(action, true);
try { if (locs == null) continue;
locs = findDestLocation(tableName, action.getAction(), false);
} catch (IOException ex) {
// There are multiple retries in locateRegion already. No need to add new.
// We can't continue with this row, hence it's the last retry.
manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
continue;
}
boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
if (isReplica && !isReplicaAction) { if (isReplica && !isReplicaAction) {
// This is the property of the current implementation, not a requirement. // This is the property of the current implementation, not a requirement.
@ -821,34 +814,89 @@ class AsyncProcess {
isReplica = isReplicaAction; isReplica = isReplicaAction;
HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
if (loc == null || loc.getServerName() == null) { if (loc == null || loc.getServerName() == null) {
try { if (isReplica) {
locs = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true, action.getReplicaId()); if (unknownReplicaActions == null) {
loc = locs.getRegionLocation(action.getReplicaId()); unknownReplicaActions = new ArrayList<Action<Row>>();
} catch (IOException e) { }
// There are multiple retries in locateRegion already. No need to add new. unknownReplicaActions.add(action);
// We can't continue with this row, hence it's the last retry. } else {
manageError(action.getOriginalIndex(), action.getAction(), false, e, null); // TODO: relies on primary location always being fetched
continue; manageLocationError(action, null);
}
if (loc == null || loc.getServerName() == null) {
// On retry, we couldn't find location for some replica we saw before.
String str = "Cannot find location for replica " + action.getReplicaId();
LOG.error(str);
manageError(action.getOriginalIndex(), action.getAction(),
false, new IOException(str), null);
continue;
} }
} else {
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
} }
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
} }
// If this is a first attempt to group and send, no replicas, we need replica thread. boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
if (!actionsByServer.isEmpty()) { if (!actionsByServer.isEmpty()) {
boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); // If this is a first attempt to group and send, no replicas, we need replica thread.
sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null); sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
? currentActions : null, numAttempt > 1 && !hasUnknown);
}
if (hasUnknown) {
actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
for (Action<Row> action : unknownReplicaActions) {
HRegionLocation loc = getReplicaLocationOrFail(action);
if (loc == null) continue;
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
}
if (!actionsByServer.isEmpty()) {
sendMultiAction(
actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
}
} }
} }
private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
// We are going to try get location once again. For each action, we'll do it once
// from cache, because the previous calls in the loop might populate it.
int replicaId = action.getReplicaId();
RegionLocations locs = findAllLocationsOrFail(action, true);
if (locs == null) return null; // manageError already called
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null || loc.getServerName() == null) {
locs = findAllLocationsOrFail(action, false);
if (locs == null) return null; // manageError already called
loc = locs.getRegionLocation(replicaId);
}
if (loc == null || loc.getServerName() == null) {
manageLocationError(action, null);
return null;
}
return loc;
}
private void manageLocationError(Action<Row> action, Exception ex) {
String msg = "Cannot get replica " + action.getReplicaId()
+ " location for " + action.getAction();
LOG.error(msg);
if (ex == null) {
ex = new IOException(msg);
}
manageError(action.getOriginalIndex(), action.getAction(),
Retry.NO_LOCATION_PROBLEM, ex, null);
}
private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
", row cannot be null");
RegionLocations loc = null;
try {
loc = hConnection.locateRegion(
tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
} catch (IOException ex) {
manageLocationError(action, ex);
}
return loc;
}
/** /**
* Send a multi action structure to the servers, after a delay depending on the attempt * Send a multi action structure to the servers, after a delay depending on the attempt
* number. Asynchronous. * number. Asynchronous.
@ -858,7 +906,7 @@ class AsyncProcess {
* @param actionsForReplicaThread original actions for replica thread; null on non-first call. * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
*/ */
private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer, private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
int numAttempt, List<Action<Row>> actionsForReplicaThread) { int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
// Run the last item on the same thread if we are already on a send thread. // Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads. // We hope most of the time it will be the only item, so we can cut down on threads.
int actionsRemaining = actionsByServer.size(); int actionsRemaining = actionsByServer.size();
@ -869,8 +917,7 @@ class AsyncProcess {
incTaskCounters(multiAction.getRegions(), server); incTaskCounters(multiAction.getRegions(), server);
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
new SingleServerRequestRunnable(multiAction, numAttempt, server)); new SingleServerRequestRunnable(multiAction, numAttempt, server));
--actionsRemaining; if ((--actionsRemaining == 0) && reuseThread) {
if ((numAttempt > 1) && actionsRemaining == 0) {
runnable.run(); runnable.run();
} else { } else {
try { try {
@ -923,21 +970,19 @@ class AsyncProcess {
* @param server the location, if any (can be null) * @param server the location, if any (can be null)
* @return true if the action can be retried, false otherwise. * @return true if the action can be retried, false otherwise.
*/ */
public boolean manageError(int originalIndex, Row row, boolean canRetry, public Retry manageError(int originalIndex, Row row, Retry canRetry,
Throwable throwable, ServerName server) { Throwable throwable, ServerName server) {
if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) { if (canRetry == Retry.YES
canRetry = false; && throwable != null && throwable instanceof DoNotRetryIOException) {
canRetry = Retry.NO_NOT_RETRIABLE;
} }
if (!canRetry) { if (canRetry != Retry.YES) {
// Batch.Callback<Res> was not called on failure in 0.94. We keep this. // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
setError(originalIndex, row, throwable, server); setError(originalIndex, row, throwable, server);
} else { } else if (isActionComplete(originalIndex, row)) {
// See if we are dealing with a replica action that was completed from other server. canRetry = Retry.NO_OTHER_SUCCEEDED;
// Doesn't have to be synchronized, worst case we'd retry and be unable to set result.
canRetry = !isActionComplete(originalIndex, row);
} }
return canRetry; return canRetry;
} }
@ -952,8 +997,12 @@ class AsyncProcess {
private void receiveGlobalFailure( private void receiveGlobalFailure(
MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) { MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
errorsByServer.reportServerError(server); errorsByServer.reportServerError(server);
boolean canRetry = errorsByServer.canRetryMore(numAttempt); Retry canRetry = errorsByServer.canRetryMore(numAttempt)
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
int failed = 0, stopped = 0;
boolean isReplica = false;
boolean firstAction = false;
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(); List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) { for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
byte[] regionName = e.getKey(); byte[] regionName = e.getKey();
@ -963,35 +1012,36 @@ class AsyncProcess {
// TODO: depending on type of exception we might not want to update cache at all? // TODO: depending on type of exception we might not want to update cache at all?
hConnection.updateCachedLocations(tableName, regionName, row, null, server); hConnection.updateCachedLocations(tableName, regionName, row, null, server);
for (Action<Row> action : e.getValue()) { for (Action<Row> action : e.getValue()) {
if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) { if (firstAction) {
firstAction = false;
isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
}
Retry retry = manageError(
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
if (retry == Retry.YES) {
toReplay.add(action); toReplay.add(action);
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
++stopped;
} else {
++failed;
} }
} }
} }
logAndResubmit(server, toReplay, numAttempt, rsActions.size(), t); if (toReplay.isEmpty()) {
logNoResubmit(server, numAttempt, rsActions.size(), t, isReplica, failed, stopped);
} else {
resubmit(server, toReplay, numAttempt, rsActions.size(), t, isReplica);
}
} }
/** /**
* Log as much info as possible, and, if there is something to replay, * Log as much info as possible, and, if there is something to replay,
* submit it again after a back off sleep. * submit it again after a back off sleep.
* @param isReplica
*/ */
private void logAndResubmit(ServerName oldServer, List<Action<Row>> toReplay, private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
int numAttempt, int failureCount, Throwable throwable) { int numAttempt, int failureCount, Throwable throwable, boolean isReplica) {
if (toReplay.isEmpty()) {
// it's either a success or a last failure
if (failureCount != 0) {
// We have a failure but nothing to retry. We're done, it's a final failure..
LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
oldServer, throwable, -1, false, errorsByServer.getStartTrackingTime()));
} else if (numAttempt > startLogErrorsCnt + 1) {
// The operation was successful, but needed several attempts. Let's log this.
LOG.info(createLog(numAttempt, failureCount, 0,
oldServer, throwable, -1, false, errorsByServer.getStartTrackingTime()));
}
return;
}
// We have something to replay. We're going to sleep a little before. // We have something to replay. We're going to sleep a little before.
// We have two contradicting needs here: // We have two contradicting needs here:
@ -1004,7 +1054,7 @@ class AsyncProcess {
// We use this value to have some logs when we have multiple failures, but not too many // We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on // logs, as errors are to be expected when a region moves, splits and so on
LOG.info(createLog(numAttempt, failureCount, toReplay.size(), LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
oldServer, throwable, backOffTime, true, errorsByServer.getStartTrackingTime())); oldServer, throwable, backOffTime, true, null, isReplica, -1, -1));
} }
try { try {
@ -1018,6 +1068,21 @@ class AsyncProcess {
groupAndSendMultiAction(toReplay, numAttempt + 1); groupAndSendMultiAction(toReplay, numAttempt + 1);
} }
private void logNoResubmit(ServerName oldServer, int numAttempt,
int failureCount, Throwable throwable, boolean isReplica, int failed, int stopped) {
if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
throwable, -1, false, timeStr, isReplica, failed, stopped);
if (failed != 0) {
// Only log final failures as warning
LOG.warn(logMessage);
} else {
LOG.info(logMessage);
}
}
}
/** /**
* Called when we receive the result of a server query. * Called when we receive the result of a server query.
* *
@ -1042,6 +1107,9 @@ class AsyncProcess {
boolean canRetry = true; boolean canRetry = true;
// Go by original action. // Go by original action.
int failed = 0, stopped = 0;
boolean isReplica = false;
boolean firstAction = false;
for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) { for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
byte[] regionName = regionEntry.getKey(); byte[] regionName = regionEntry.getKey();
Map<Integer, Object> regionResults = responses.getResults().get(regionName); Map<Integer, Object> regionResults = responses.getResults().get(regionName);
@ -1055,6 +1123,10 @@ class AsyncProcess {
} }
boolean regionFailureRegistered = false; boolean regionFailureRegistered = false;
for (Action<Row> sentAction : regionEntry.getValue()) { for (Action<Row> sentAction : regionEntry.getValue()) {
if (firstAction) {
firstAction = false;
isReplica = !RegionReplicaUtil.isDefaultReplica(sentAction.getReplicaId());
}
Object result = regionResults.get(sentAction.getOriginalIndex()); Object result = regionResults.get(sentAction.getOriginalIndex());
// Failure: retry if it's make sense else update the errors lists // Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) { if (result == null || result instanceof Throwable) {
@ -1071,9 +1143,14 @@ class AsyncProcess {
canRetry = errorsByServer.canRetryMore(numAttempt); canRetry = errorsByServer.canRetryMore(numAttempt);
} }
++failureCount; ++failureCount;
if (manageError( Retry retry = manageError(sentAction.getOriginalIndex(), row,
sentAction.getOriginalIndex(), row, canRetry, (Throwable)result, server)) { canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
if (retry == Retry.YES) {
toReplay.add(sentAction); toReplay.add(sentAction);
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
++stopped;
} else {
++failed;
} }
} else { } else {
if (callback != null) { if (callback != null) {
@ -1111,39 +1188,58 @@ class AsyncProcess {
failureCount += actions.size(); failureCount += actions.size();
for (Action<Row> action : actions) { for (Action<Row> action : actions) {
if (firstAction) {
firstAction = false;
isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
}
Row row = action.getAction(); Row row = action.getAction();
if (manageError(action.getOriginalIndex(), row, canRetry, throwable, server)) { Retry retry = manageError(action.getOriginalIndex(), row,
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
if (retry == Retry.YES) {
toReplay.add(action); toReplay.add(action);
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
++stopped;
} else {
++failed;
} }
} }
} }
logAndResubmit(server, toReplay, numAttempt, failureCount, throwable); if (toReplay.isEmpty()) {
logNoResubmit(server, numAttempt, failureCount, throwable, isReplica, failed, stopped);
} else {
resubmit(server, toReplay, numAttempt, failureCount, throwable, isReplica);
}
} }
private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
Throwable error, long backOffTime, boolean willRetry, String startTime){ Throwable error, long backOffTime, boolean willRetry, String startTime,
boolean isReplica, int failed, int stopped) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("#").append(id).append(", table=").append(tableName).append(", ")
sb.append("#").append(id).append(", table=").append(tableName). .append(isReplica ? "replica, " : "primary, ").append("attempt=").append(numAttempt)
append(", attempt=").append(numAttempt).append("/").append(numTries).append(" "); .append("/").append(numTries).append(" ");
if (failureCount > 0 || error != null){ if (failureCount > 0 || error != null){
sb.append("failed ").append(failureCount).append(" ops").append(", last exception: "). sb.append("failed ").append(failureCount).append(" ops").append(", last exception: ").
append(error == null ? "null" : error); append(error == null ? "null" : error);
} else { } else {
sb.append("SUCCEEDED"); sb.append("succeeded");
} }
sb.append(" on ").append(sn); sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
sb.append(", tracking started ").append(startTime);
if (willRetry) { if (willRetry) {
sb.append(", retrying after ").append(backOffTime).append(" ms"). sb.append(", retrying after ").append(backOffTime).append(" ms").
append(", replay ").append(replaySize).append(" ops."); append(", replay ").append(replaySize).append(" ops");
} else if (failureCount > 0) { } else if (failureCount > 0) {
sb.append(" - FAILED, NOT RETRYING ANYMORE"); if (stopped > 0) {
sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
}
if (failed > 0) {
sb.append("; not retrying ").append(failed).append(" - final failure");
}
} }
return sb.toString(); return sb.toString();
@ -1165,7 +1261,7 @@ class AsyncProcess {
decActionCounter(index); decActionCounter(index);
return; // Simple case, no replica requests. return; // Simple case, no replica requests.
} else if ((state = trySetResultSimple( } else if ((state = trySetResultSimple(
index, action.getAction(), result, isStale)) == null) { index, action.getAction(), false, result, null, isStale)) == null) {
return; // Simple case, no replica requests. return; // Simple case, no replica requests.
} }
assert state != null; assert state != null;
@ -1204,8 +1300,8 @@ class AsyncProcess {
errors.add(throwable, row, server); errors.add(throwable, row, server);
decActionCounter(index); decActionCounter(index);
return; // Simple case, no replica requests. return; // Simple case, no replica requests.
} else if ((state = trySetResultSimple(index, row, throwable, false)) == null) { } else if ((state = trySetResultSimple(
errors.add(throwable, row, server); index, row, true, throwable, server, false)) == null) {
return; // Simple case, no replica requests. return; // Simple case, no replica requests.
} }
assert state != null; assert state != null;
@ -1264,8 +1360,8 @@ class AsyncProcess {
* Tries to set the result or error for a particular action as if there were no replica calls. * Tries to set the result or error for a particular action as if there were no replica calls.
* @return null if successful; replica state if there were in fact replica calls. * @return null if successful; replica state if there were in fact replica calls.
*/ */
private ReplicaResultState trySetResultSimple( private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
int index, Row row, Object result, boolean isFromReplica) { Object result, ServerName server, boolean isFromReplica) {
Object resObj = null; Object resObj = null;
if (!isReplicaGet(row)) { if (!isReplicaGet(row)) {
if (isFromReplica) { if (isFromReplica) {
@ -1282,11 +1378,20 @@ class AsyncProcess {
} }
} }
} }
ReplicaResultState rrs =
(resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
if (rrs == null && isError) {
// The resObj is not replica state (null or already set).
errors.add((Throwable)result, row, server);
}
if (resObj == null) { if (resObj == null) {
// resObj is null - no replica calls were made.
decActionCounter(index); decActionCounter(index);
return null; return null;
} }
return (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; return rrs;
} }
private void decActionCounter(int index) { private void decActionCounter(int index) {
@ -1528,4 +1633,15 @@ class AsyncProcess {
private static boolean isReplicaGet(Row row) { private static boolean isReplicaGet(Row row) {
return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE); return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
} }
/**
* For manageError. Only used to make logging more clear, we don't actually care why we don't retry.
*/
private enum Retry {
YES,
NO_LOCATION_PROBLEM,
NO_NOT_RETRIABLE,
NO_RETRIES_EXHAUSTED,
NO_OTHER_SUCCEEDED
}
} }

View File

@ -269,10 +269,4 @@ interface ClusterConnection extends HConnection {
* @return Default AsyncProcess associated with this connection. * @return Default AsyncProcess associated with this connection.
*/ */
AsyncProcess getAsyncProcess(); AsyncProcess getAsyncProcess();
/**
* @return All locations for a particular region.
*/
RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
} }

View File

@ -209,11 +209,6 @@ class ConnectionAdapter implements ClusterConnection {
return wrappedConnection.locateRegion(tableName, row, useCache, retry); return wrappedConnection.locateRegion(tableName, row, useCache, retry);
} }
@Override
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
return wrappedConnection.locateRegionAll(tableName, row);
}
@Override @Override
public void clearRegionCache() { public void clearRegionCache() {
wrappedConnection.clearRegionCache(); wrappedConnection.clearRegionCache();

View File

@ -1027,16 +1027,10 @@ class ConnectionManager {
return locateRegions(TableName.valueOf(tableName), useCache, offlined); return locateRegions(TableName.valueOf(tableName), useCache, offlined);
} }
@Override
public RegionLocations locateRegionAll(
final TableName tableName, final byte[] row) throws IOException{
return locateRegion(tableName, row, true, true);
}
@Override @Override
public HRegionLocation locateRegion( public HRegionLocation locateRegion(
final TableName tableName, final byte[] row) throws IOException{ final TableName tableName, final byte[] row) throws IOException{
RegionLocations locations = locateRegionAll(tableName, row); RegionLocations locations = locateRegion(tableName, row, true, true);
return locations == null ? null : locations.getRegionLocation(); return locations == null ? null : locations.getRegionLocation();
} }
@ -2468,12 +2462,12 @@ class ConnectionManager {
new ConcurrentHashMap<ServerName, ServerErrors>(); new ConcurrentHashMap<ServerName, ServerErrors>();
private final long canRetryUntil; private final long canRetryUntil;
private final int maxRetries; private final int maxRetries;
private final String startTrackingTime; private final long startTrackingTime;
public ServerErrorTracker(long timeout, int maxRetries) { public ServerErrorTracker(long timeout, int maxRetries) {
this.maxRetries = maxRetries; this.maxRetries = maxRetries;
this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout; this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
this.startTrackingTime = new Date().toString(); this.startTrackingTime = new Date().getTime();
} }
/** /**
@ -2520,7 +2514,7 @@ class ConnectionManager {
} }
} }
String getStartTrackingTime() { long getStartTrackingTime() {
return startTrackingTime; return startTrackingTime;
} }

View File

@ -37,8 +37,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
@ -344,7 +346,8 @@ public class TestAsyncProcess {
} }
@Override @Override
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { public RegionLocations locateRegion(TableName tableName,
byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
return new RegionLocations(loc1); return new RegionLocations(loc1);
} }
} }
@ -363,7 +366,8 @@ public class TestAsyncProcess {
} }
@Override @Override
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { public RegionLocations locateRegion(TableName tableName,
byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
int i = 0; int i = 0;
for (HRegionLocation hr : hrl){ for (HRegionLocation hr : hrl){
if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
@ -377,6 +381,9 @@ public class TestAsyncProcess {
} }
@Rule
public Timeout timeout = new Timeout(10000); // 10 seconds max per method tested
@Test @Test
public void testSubmit() throws Exception { public void testSubmit() throws Exception {
ClusterConnection hc = createHConnection(); ClusterConnection hc = createHConnection();
@ -627,8 +634,8 @@ public class TestAsyncProcess {
private static void setMockLocation(ClusterConnection hc, byte[] row, private static void setMockLocation(ClusterConnection hc, byte[] row,
RegionLocations result) throws IOException { RegionLocations result) throws IOException {
Mockito.when(hc.locateRegionAll( Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result); Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
} }
private static ClusterConnection createHConnectionCommon() { private static ClusterConnection createHConnectionCommon() {
@ -1009,7 +1016,7 @@ public class TestAsyncProcess {
for (int i = 0; i < expecteds.length; ++i) { for (int i = 0; i < expecteds.length; ++i) {
Object actual = actuals[i]; Object actual = actuals[i];
RR expected = expecteds[i]; RR expected = expecteds[i];
Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable); Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
if (expected != RR.FAILED && expected != RR.DONT_CARE) { if (expected != RR.FAILED && expected != RR.DONT_CARE) {
Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale()); Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
} }

View File

@ -443,9 +443,4 @@ class CoprocessorHConnection implements ClusterConnection {
public AsyncProcess getAsyncProcess() { public AsyncProcess getAsyncProcess() {
return delegate.getAsyncProcess(); return delegate.getAsyncProcess();
} }
@Override
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
return delegate.locateRegionAll(tableName, row);
}
} }

View File

@ -111,8 +111,9 @@ public class HConnectionTestingUtility {
thenReturn(loc); thenReturn(loc);
Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())). Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
thenReturn(loc); thenReturn(loc);
Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[]) Mockito.any())). Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any(),
thenReturn(new RegionLocations(loc)); Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt()))
.thenReturn(new RegionLocations(loc));
if (admin != null) { if (admin != null) {
// If a call to getAdmin, return this implementation. // If a call to getAdmin, return this implementation.
Mockito.when(c.getAdmin(Mockito.any(ServerName.class))). Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).