diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 252e808c5c8..3c6f5be9c81 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -136,12 +136,14 @@ class AsyncProcess { /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ int callCount; - /** Call that succeeds sets the count to 0 and sets this to result. Call that fails but - * is not last, adds error to list. If all calls fail the last one sets this to list. */ - Object result = null; /** Errors for which it is not decided whether we will report them to user. If one of the * calls succeeds, we will discard the errors that may have happened in the other calls. */ BatchErrors replicaErrors = null; + + @Override + public String toString() { + return "[call count " + callCount + "; errors " + replicaErrors + "]"; + } } @@ -622,6 +624,12 @@ class AsyncProcess { replicaCount += (locs[i] != null) ? 1 : 0; } if (replicaCount == 0) { + // we could have got the replica back (if the server went down and the replica moved) + try { + loc = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true); + } catch (IOException e) { + manageError(action.getOriginalIndex(), action.getAction(), false, e, null); + } LOG.warn("No replicas found for " + action.getAction()); return; } @@ -813,12 +821,23 @@ class AsyncProcess { isReplica = isReplicaAction; HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); if (loc == null || loc.getServerName() == null) { - // On retry, we couldn't find location for some replica we saw before. - String str = "Cannot find location for replica " + action.getReplicaId(); - LOG.error(str); - manageError(action.getOriginalIndex(), action.getAction(), - false, new IOException(str), null); - continue; + try { + locs = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true, action.getReplicaId()); + loc = locs.getRegionLocation(action.getReplicaId()); + } catch (IOException e) { + // There are multiple retries in locateRegion already. No need to add new. + // We can't continue with this row, hence it's the last retry. + manageError(action.getOriginalIndex(), action.getAction(), false, e, null); + continue; + } + if (loc == null || loc.getServerName() == null) { + // On retry, we couldn't find location for some replica we saw before. + String str = "Cannot find location for replica " + action.getReplicaId(); + LOG.error(str); + manageError(action.getOriginalIndex(), action.getAction(), + false, new IOException(str), null); + continue; + } } byte[] regionName = loc.getRegionInfo().getRegionName(); addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); @@ -1136,20 +1155,37 @@ class AsyncProcess { * @param result The result. */ private void setResult(Action action, Object result) { + if (result == null) { + throw new RuntimeException("Result cannot be null"); + } ReplicaResultState state = null; boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); - if (results == null || ((state = trySetResultSimple( - action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) { - decActionCounter(); + int index = action.getOriginalIndex(); + if (results == null) { + decActionCounter(index); + return; // Simple case, no replica requests. + } else if ((state = trySetResultSimple( + index, action.getAction(), result, isStale)) == null) { return; // Simple case, no replica requests. } + assert state != null; + // At this point we know that state is set to replica tracking class. + // It could be that someone else is also looking at it; however, we know there can + // only be one state object, and only one thread can set callCount to 0. Other threads + // will either see state with callCount 0 after locking it; or will not see state at all + // we will replace it with the result. synchronized (state) { if (state.callCount == 0) return; // someone already set the result - state.result = result; state.callCount = 0; - state.replicaErrors = null; // no longer matters } - decActionCounter(); + synchronized (replicaResultLock) { + if (results[index] != state) { + throw new AssertionError("We set the callCount but someone else replaced the result"); + } + results[index] = result; + } + + decActionCounter(index); } /** @@ -1161,19 +1197,24 @@ class AsyncProcess { */ private void setError(int index, Row row, Throwable throwable, ServerName server) { ReplicaResultState state = null; - if (results == null - || ((state = trySetResultSimple(index, row, throwable, false)) == null)) { + if (results == null) { + // Note that we currently cannot have replica requests with null results. So it shouldn't + // happen that multiple replica calls will call dAC for same actions with results == null. + // Only one call per action should be present in this case. + errors.add(throwable, row, server); + decActionCounter(index); + return; // Simple case, no replica requests. + } else if ((state = trySetResultSimple(index, row, throwable, false)) == null) { errors.add(throwable, row, server); - decActionCounter(); return; // Simple case, no replica requests. } + assert state != null; BatchErrors target = null; // Error will be added to final errors, or temp replica errors. boolean isActionDone = false; synchronized (state) { switch (state.callCount) { case 0: return; // someone already set the result case 1: { // All calls failed, we are the last error. - state.result = throwable; target = errors; isActionDone = true; break; @@ -1190,12 +1231,19 @@ class AsyncProcess { --state.callCount; } target.add(throwable, row, server); - if (!isActionDone) return; - if (state.replicaErrors != null) { // last call, no need to lock - errors.merge(state.replicaErrors); - state.replicaErrors = null; + if (isActionDone) { + if (state.replicaErrors != null) { // last call, no need to lock + errors.merge(state.replicaErrors); + } + // See setResult for explanations. + synchronized (replicaResultLock) { + if (results[index] != state) { + throw new AssertionError("We set the callCount but someone else replaced the result"); + } + results[index] = throwable; + } + decActionCounter(index); } - decActionCounter(); } /** @@ -1234,47 +1282,43 @@ class AsyncProcess { } } } - return (resObj == null || !(resObj instanceof ReplicaResultState)) - ? null : (ReplicaResultState)resObj; + if (resObj == null) { + decActionCounter(index); + return null; + } + return (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; } - private void decActionCounter() { - if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) { - // Convert replica sync structures to results. - int staleCount = 0; - if (replicaGetIndices == null) { - for (int i = 0; i < results.length; ++i) { - staleCount += convertReplicaResult(i) ? 1 : 0; - } - } else { - for (int i = 0; i < replicaGetIndices.length; ++i) { - staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0; - } + private void decActionCounter(int index) { + long actionsRemaining = actionsInProgress.decrementAndGet(); + if (actionsRemaining < 0) { + String error = buildDetailedErrorMsg("Incorrect actions in progress", index); + throw new AssertionError(error); + } else if (actionsRemaining == 0) { + synchronized (actionsInProgress) { + actionsInProgress.notifyAll(); } - if (!actionsInProgress.compareAndSet(1, 0)) { - throw new AssertionError("Cannot set actions in progress to 0"); - } - if (staleCount > 0) { - LOG.trace("Returning " + staleCount + " stale results"); + } + } + + private String buildDetailedErrorMsg(String string, int index) { + String error = string + "; called for " + index + + ", actionsInProgress " + actionsInProgress.get() + "; replica gets: "; + if (replicaGetIndices != null) { + for (int i = 0; i < replicaGetIndices.length; ++i) { + error += replicaGetIndices[i] + ", "; } } else { - actionsInProgress.decrementAndGet(); + error += (hasAnyReplicaGets ? "all" : "none"); } - synchronized (actionsInProgress) { - actionsInProgress.notifyAll(); + error += "; results "; + if (results != null) { + for (int i = 0; i < results.length; ++i) { + Object o = results[i]; + error += ((o == null) ? "null" : o.toString()) + ", "; + } } - } - - private boolean convertReplicaResult(int index) { - if (!(results[index] instanceof ReplicaResultState)) return false; - ReplicaResultState state = (ReplicaResultState)results[index]; - // We know that noone will touch state with 0 callCount, no need to lock - if (state.callCount != 0) { - throw new AssertionError("Actions are done but callcount is " + state.callCount); - } - // TODO: we expect the Result coming from server to already have "isStale" specified. - Object res = results[index] = state.result; - return (res instanceof Result) && ((Result)res).isStale(); + return error; } @Override