HBASE-9768 Two issues in AsyncProcess

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1533604 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-10-18 19:34:18 +00:00
parent 000596f7c3
commit e17f094ddb
2 changed files with 32 additions and 18 deletions

View File

@ -265,7 +265,18 @@ class AsyncProcess<CResult> {
new HashMap<HRegionLocation, MultiAction<Row>>(); new HashMap<HRegionLocation, MultiAction<Row>>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
long currentTaskCnt = tasksDone.get();
boolean alreadyLooped = false;
do { do {
if (alreadyLooped){
// if, for whatever reason, we looped, we want to be sure that something has changed.
waitForNextTaskDone(currentTaskCnt);
currentTaskCnt = tasksDone.get();
} else {
alreadyLooped = true;
}
// Wait until there is at least one slot for a new task. // Wait until there is at least one slot for a new task.
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
@ -280,8 +291,9 @@ class AsyncProcess<CResult> {
Row r = it.next(); Row r = it.next();
HRegionLocation loc = findDestLocation(r, 1, posInList); HRegionLocation loc = findDestLocation(r, 1, posInList);
if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) { if (loc == null) { // loc is null if there is an error such as meta not available.
// loc is null if there is an error such as meta not available. it.remove();
} else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
Action<Row> action = new Action<Row>(r, ++posInList); Action<Row> action = new Action<Row>(r, ++posInList);
retainedActions.add(action); retainedActions.add(action);
addAction(loc, action, actionsByServer); addAction(loc, action, actionsByServer);
@ -644,6 +656,7 @@ class AsyncProcess<CResult> {
for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS : for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
responses.getResults().entrySet()) { responses.getResults().entrySet()) {
boolean regionFailureRegistered = false;
for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) { for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
Object result = regionResult.getSecond(); Object result = regionResult.getSecond();
@ -652,8 +665,9 @@ class AsyncProcess<CResult> {
throwable = (Throwable) result; throwable = (Throwable) result;
Action<Row> correspondingAction = initialActions.get(regionResult.getFirst()); Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
Row row = correspondingAction.getAction(); Row row = correspondingAction.getAction();
failureCount++;
if (failureCount++ == 0) { // We're doing this once per location. if (!regionFailureRegistered) { // We're doing this once per location.
regionFailureRegistered= true;
hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location); hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
if (errorsByServer != null) { if (errorsByServer != null) {
errorsByServer.reportServerError(location); errorsByServer.reportServerError(location);

View File

@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@ -889,6 +887,7 @@ public class HTable implements HTableInterface {
*/ */
private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
if (ap.hasError()){ if (ap.hasError()){
writeAsyncBuffer.add(put);
backgroundFlushCommits(true); backgroundFlushCommits(true);
} }
@ -907,25 +906,29 @@ public class HTable implements HTableInterface {
* Send the operations in the buffer to the servers. Does not wait for the server's answer. * Send the operations in the buffer to the servers. Does not wait for the server's answer.
* If the is an error (max retried reach from a previous flush or bad operation), it tries to * If the is an error (max retried reach from a previous flush or bad operation), it tries to
* send all operations in the buffer and sends an exception. * send all operations in the buffer and sends an exception.
* @param synchronous - if true, sends all the writes and wait for all of them to finish before
* returning.
*/ */
private void backgroundFlushCommits(boolean synchronous) throws private void backgroundFlushCommits(boolean synchronous) throws
InterruptedIOException, RetriesExhaustedWithDetailsException { InterruptedIOException, RetriesExhaustedWithDetailsException {
try { try {
// If there is an error on the operations in progress, we don't add new operations. do {
if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
ap.submit(writeAsyncBuffer, true); ap.submit(writeAsyncBuffer, true);
} } while (synchronous && !writeAsyncBuffer.isEmpty());
if (synchronous || ap.hasError()) { if (synchronous) {
if (ap.hasError() && LOG.isDebugEnabled()) {
LOG.debug(tableName + ": One or more of the operations have failed -" +
" waiting for all operation in progress to finish (successfully or not)");
}
ap.waitUntilDone(); ap.waitUntilDone();
} }
if (ap.hasError()) { if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -" +
" waiting for all operation in progress to finish (successfully or not)");
while (!writeAsyncBuffer.isEmpty()) {
ap.submit(writeAsyncBuffer, true);
}
ap.waitUntilDone();
if (!clearBufferOnFail) { if (!clearBufferOnFail) {
// if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
// write buffer. This is a questionable feature kept here for backward compatibility // write buffer. This is a questionable feature kept here for backward compatibility
@ -1186,12 +1189,9 @@ public class HTable implements HTableInterface {
*/ */
@Override @Override
public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException { public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
// We're looping, as if one region is overloaded we keep its operations in the buffer.
// As we can have an operation in progress even if the buffer is empty, we call // As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time. // backgroundFlushCommits at least one time.
do { backgroundFlushCommits(true);
backgroundFlushCommits(true);
} while (!writeAsyncBuffer.isEmpty());
} }
/** /**