HBASE-10634 Multiget doesn't fully work

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1586184 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2014-04-10 00:50:45 +00:00 committed by Enis Soztutar
parent d313103aeb
commit 61bce90362
1 changed files with 102 additions and 58 deletions

View File

@ -136,12 +136,14 @@ class AsyncProcess {
/** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
int callCount;
/** Call that succeeds sets the count to 0 and sets this to result. Call that fails but
* is not last, adds error to list. If all calls fail the last one sets this to list. */
Object result = null;
/** Errors for which it is not decided whether we will report them to user. If one of the
* calls succeeds, we will discard the errors that may have happened in the other calls. */
BatchErrors replicaErrors = null;
@Override
public String toString() {
return "[call count " + callCount + "; errors " + replicaErrors + "]";
}
}
@ -622,6 +624,12 @@ class AsyncProcess {
replicaCount += (locs[i] != null) ? 1 : 0;
}
if (replicaCount == 0) {
// we could have got the replica back (if the server went down and the replica moved)
try {
loc = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true);
} catch (IOException e) {
manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
}
LOG.warn("No replicas found for " + action.getAction());
return;
}
@ -813,12 +821,23 @@ class AsyncProcess {
isReplica = isReplicaAction;
HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
if (loc == null || loc.getServerName() == null) {
// On retry, we couldn't find location for some replica we saw before.
String str = "Cannot find location for replica " + action.getReplicaId();
LOG.error(str);
manageError(action.getOriginalIndex(), action.getAction(),
false, new IOException(str), null);
continue;
try {
locs = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true, action.getReplicaId());
loc = locs.getRegionLocation(action.getReplicaId());
} catch (IOException e) {
// There are multiple retries in locateRegion already. No need to add new.
// We can't continue with this row, hence it's the last retry.
manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
continue;
}
if (loc == null || loc.getServerName() == null) {
// On retry, we couldn't find location for some replica we saw before.
String str = "Cannot find location for replica " + action.getReplicaId();
LOG.error(str);
manageError(action.getOriginalIndex(), action.getAction(),
false, new IOException(str), null);
continue;
}
}
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
@ -1136,20 +1155,37 @@ class AsyncProcess {
* @param result The result.
*/
private void setResult(Action<Row> action, Object result) {
if (result == null) {
throw new RuntimeException("Result cannot be null");
}
ReplicaResultState state = null;
boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
if (results == null || ((state = trySetResultSimple(
action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) {
decActionCounter();
int index = action.getOriginalIndex();
if (results == null) {
decActionCounter(index);
return; // Simple case, no replica requests.
} else if ((state = trySetResultSimple(
index, action.getAction(), result, isStale)) == null) {
return; // Simple case, no replica requests.
}
assert state != null;
// At this point we know that state is set to replica tracking class.
// It could be that someone else is also looking at it; however, we know there can
// only be one state object, and only one thread can set callCount to 0. Other threads
// will either see state with callCount 0 after locking it; or will not see state at all
// we will replace it with the result.
synchronized (state) {
if (state.callCount == 0) return; // someone already set the result
state.result = result;
state.callCount = 0;
state.replicaErrors = null; // no longer matters
}
decActionCounter();
synchronized (replicaResultLock) {
if (results[index] != state) {
throw new AssertionError("We set the callCount but someone else replaced the result");
}
results[index] = result;
}
decActionCounter(index);
}
/**
@ -1161,19 +1197,24 @@ class AsyncProcess {
*/
private void setError(int index, Row row, Throwable throwable, ServerName server) {
ReplicaResultState state = null;
if (results == null
|| ((state = trySetResultSimple(index, row, throwable, false)) == null)) {
if (results == null) {
// Note that we currently cannot have replica requests with null results. So it shouldn't
// happen that multiple replica calls will call dAC for same actions with results == null.
// Only one call per action should be present in this case.
errors.add(throwable, row, server);
decActionCounter(index);
return; // Simple case, no replica requests.
} else if ((state = trySetResultSimple(index, row, throwable, false)) == null) {
errors.add(throwable, row, server);
decActionCounter();
return; // Simple case, no replica requests.
}
assert state != null;
BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
boolean isActionDone = false;
synchronized (state) {
switch (state.callCount) {
case 0: return; // someone already set the result
case 1: { // All calls failed, we are the last error.
state.result = throwable;
target = errors;
isActionDone = true;
break;
@ -1190,12 +1231,19 @@ class AsyncProcess {
--state.callCount;
}
target.add(throwable, row, server);
if (!isActionDone) return;
if (state.replicaErrors != null) { // last call, no need to lock
errors.merge(state.replicaErrors);
state.replicaErrors = null;
if (isActionDone) {
if (state.replicaErrors != null) { // last call, no need to lock
errors.merge(state.replicaErrors);
}
// See setResult for explanations.
synchronized (replicaResultLock) {
if (results[index] != state) {
throw new AssertionError("We set the callCount but someone else replaced the result");
}
results[index] = throwable;
}
decActionCounter(index);
}
decActionCounter();
}
/**
@ -1234,47 +1282,43 @@ class AsyncProcess {
}
}
}
return (resObj == null || !(resObj instanceof ReplicaResultState))
? null : (ReplicaResultState)resObj;
if (resObj == null) {
decActionCounter(index);
return null;
}
return (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
}
private void decActionCounter() {
if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) {
// Convert replica sync structures to results.
int staleCount = 0;
if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) {
staleCount += convertReplicaResult(i) ? 1 : 0;
}
} else {
for (int i = 0; i < replicaGetIndices.length; ++i) {
staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0;
}
private void decActionCounter(int index) {
long actionsRemaining = actionsInProgress.decrementAndGet();
if (actionsRemaining < 0) {
String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
throw new AssertionError(error);
} else if (actionsRemaining == 0) {
synchronized (actionsInProgress) {
actionsInProgress.notifyAll();
}
if (!actionsInProgress.compareAndSet(1, 0)) {
throw new AssertionError("Cannot set actions in progress to 0");
}
if (staleCount > 0) {
LOG.trace("Returning " + staleCount + " stale results");
}
}
private String buildDetailedErrorMsg(String string, int index) {
String error = string + "; called for " + index +
", actionsInProgress " + actionsInProgress.get() + "; replica gets: ";
if (replicaGetIndices != null) {
for (int i = 0; i < replicaGetIndices.length; ++i) {
error += replicaGetIndices[i] + ", ";
}
} else {
actionsInProgress.decrementAndGet();
error += (hasAnyReplicaGets ? "all" : "none");
}
synchronized (actionsInProgress) {
actionsInProgress.notifyAll();
error += "; results ";
if (results != null) {
for (int i = 0; i < results.length; ++i) {
Object o = results[i];
error += ((o == null) ? "null" : o.toString()) + ", ";
}
}
}
private boolean convertReplicaResult(int index) {
if (!(results[index] instanceof ReplicaResultState)) return false;
ReplicaResultState state = (ReplicaResultState)results[index];
// We know that noone will touch state with 0 callCount, no need to lock
if (state.callCount != 0) {
throw new AssertionError("Actions are done but callcount is " + state.callCount);
}
// TODO: we expect the Result coming from server to already have "isStale" specified.
Object res = results[index] = state.result;
return (res instanceof Result) && ((Result)res).isStale();
return error;
}
@Override