HBASE-16631 Extract AsyncRequestFuture related code from AsyncProcess

This commit is contained in:
chenheng 2016-09-17 00:35:23 +08:00
parent b6b72361b6
commit 2cf8907db5
8 changed files with 1482 additions and 1346 deletions

View File

@ -0,0 +1,40 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.InterruptedIOException;
import java.util.List;
/**
* The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
* then errors and failed operations in this object will reflect global errors.
* 2) If submit call is made with needResults false, results will not be saved.
* */
@InterfaceAudience.Private
public interface AsyncRequestFuture {
public boolean hasError();
public RetriesExhaustedWithDetailsException getErrors();
public List<? extends Row> getFailedOperations();
public Object[] getResults() throws InterruptedIOException;
/** Wait until all tasks are executed, successfully or not. */
public void waitUntilDone() throws InterruptedIOException;
}

View File

@ -0,0 +1,69 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
import java.util.ArrayList;
import java.util.List;
class BatchErrors {
private static final Log LOG = LogFactory.getLog(BatchErrors.class);
final List<Throwable> throwables = new ArrayList<Throwable>();
final List<Row> actions = new ArrayList<Row>();
final List<String> addresses = new ArrayList<String>();
public synchronized void add(Throwable ex, Row row, ServerName serverName) {
if (row == null){
throw new IllegalArgumentException("row cannot be null. location=" + serverName);
}
throwables.add(ex);
actions.add(row);
addresses.add(serverName != null ? serverName.toString() : "null");
}
public boolean hasErrors() {
return !throwables.isEmpty();
}
synchronized RetriesExhaustedWithDetailsException makeException(boolean logDetails) {
if (logDetails) {
LOG.error("Exception occurred! Exception details: " + throwables + ";\nActions: "
+ actions);
}
return new RetriesExhaustedWithDetailsException(new ArrayList<Throwable>(throwables),
new ArrayList<Row>(actions), new ArrayList<String>(addresses));
}
public synchronized void clear() {
throwables.clear();
actions.clear();
addresses.clear();
}
public synchronized void merge(BatchErrors other) {
throwables.addAll(other.throwables);
actions.addAll(other.actions);
addresses.addAll(other.addresses);
}
}

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

View File

@ -59,8 +59,6 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess;
import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
@ -162,8 +160,9 @@ public class TestAsyncProcess {
Batch.Callback<Res> callback, Object[] results, boolean needResults,
CancellableRegionServerCallable callable, int curTimeout) {
// Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults, null, rpcTimeout);
AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>(
DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults,
results, callback, callable, curTimeout, this);
allReqs.add(r);
return r;
}
@ -212,18 +211,14 @@ public class TestAsyncProcess {
previousTimeout = curTimeout;
return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
}
@Override
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
final MultiResponse mr = createMultiResponse(
callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
callable1.getMulti(), nbMultiResponse, nbActions,
new ResponseGenerator() {
@Override
public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
if (Arrays.equals(FAILS, a.getAction().getRow())) {
@ -250,6 +245,28 @@ public class TestAsyncProcess {
}
};
}
}
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
public MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback callback,
CancellableRegionServerCallable callable, int timeout,
AsyncProcess asyncProcess) {
super(tableName, actions, nonceGroup, pool, needResults,
results, callback, callable, timeout, asyncProcess);
}
@Override
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
}
static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
@ -287,6 +304,7 @@ public class TestAsyncProcess {
return new CallerWithFailure(ioe);
}
}
/**
* Make the backoff time always different on each call.
*/
@ -816,7 +834,7 @@ public class TestAsyncProcess {
puts.add(createPut(1, true));
// Wait for AP to be free. While ars might have the result, ap counters are decreased later.
ap.waitUntilDone();
ap.waitForMaximumCurrentTasks(0, null);
ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
@ -869,7 +887,7 @@ public class TestAsyncProcess {
puts.add(createPut(3, true));
}
ap.submit(null, DUMMY_TABLE, puts, true, null, false);
ap.waitUntilDone();
ap.waitForMaximumCurrentTasks(0, null);
// More time to wait if there are incorrect task count.
TimeUnit.SECONDS.sleep(1);
assertEquals(0, ap.tasksInProgress.get());
@ -1051,7 +1069,7 @@ public class TestAsyncProcess {
Put p = createPut(1, false);
mutator.mutate(p);
ap.waitUntilDone(); // Let's do all the retries.
ap.waitForMaximumCurrentTasks(0, null); // Let's do all the retries.
// We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the
// doPut if it fails.

View File

@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;