HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
992e5717d4
commit
8cb55c4080
|
@ -19,45 +19,35 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
|
||||
import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* This class allows a continuous flow of requests. It's written to be compatible with a
|
||||
|
@ -95,9 +85,10 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
* </p>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
class AsyncProcess {
|
||||
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
|
||||
protected static final AtomicLong COUNTER = new AtomicLong();
|
||||
private static final AtomicLong COUNTER = new AtomicLong();
|
||||
|
||||
public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
|
||||
|
||||
|
@ -116,31 +107,6 @@ class AsyncProcess {
|
|||
*/
|
||||
public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
|
||||
|
||||
protected final int thresholdToLogUndoneTaskDetails;
|
||||
private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
|
||||
"hbase.client.threshold.log.details";
|
||||
private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
|
||||
private static final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
|
||||
|
||||
/**
|
||||
* The maximum size of single RegionServer.
|
||||
*/
|
||||
public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
|
||||
|
||||
/**
|
||||
* Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
|
||||
*/
|
||||
public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
|
||||
|
||||
/**
|
||||
* The maximum size of submit.
|
||||
*/
|
||||
public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
|
||||
/**
|
||||
* Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
|
||||
*/
|
||||
public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
|
||||
|
||||
/**
|
||||
* Return value from a submit that didn't contain any requests.
|
||||
*/
|
||||
|
@ -173,64 +139,42 @@ class AsyncProcess {
|
|||
};
|
||||
|
||||
// TODO: many of the fields should be made private
|
||||
protected final long id;
|
||||
final long id;
|
||||
|
||||
protected final ClusterConnection connection;
|
||||
protected final RpcRetryingCallerFactory rpcCallerFactory;
|
||||
protected final RpcControllerFactory rpcFactory;
|
||||
protected final BatchErrors globalErrors;
|
||||
protected final ExecutorService pool;
|
||||
final ClusterConnection connection;
|
||||
private final RpcRetryingCallerFactory rpcCallerFactory;
|
||||
final RpcControllerFactory rpcFactory;
|
||||
final BatchErrors globalErrors;
|
||||
|
||||
protected final AtomicLong tasksInProgress = new AtomicLong(0);
|
||||
protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
|
||||
new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
|
||||
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
|
||||
new ConcurrentHashMap<ServerName, AtomicInteger>();
|
||||
// Start configuration settings.
|
||||
protected final int startLogErrorsCnt;
|
||||
final int startLogErrorsCnt;
|
||||
|
||||
/**
|
||||
* The number of tasks simultaneously executed on the cluster.
|
||||
*/
|
||||
protected final int maxTotalConcurrentTasks;
|
||||
|
||||
/**
|
||||
* The max heap size of all tasks simultaneously executed on a server.
|
||||
*/
|
||||
protected final long maxHeapSizePerRequest;
|
||||
protected final long maxHeapSizeSubmit;
|
||||
/**
|
||||
* The number of tasks we run in parallel on a single region.
|
||||
* With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
|
||||
* a set of operations on a region before the previous one is done. As well, this limits
|
||||
* the pressure we put on the region server.
|
||||
*/
|
||||
protected final int maxConcurrentTasksPerRegion;
|
||||
|
||||
/**
|
||||
* The number of task simultaneously executed on a single region server.
|
||||
*/
|
||||
protected final int maxConcurrentTasksPerServer;
|
||||
protected final long pause;
|
||||
protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
|
||||
protected int numTries;
|
||||
protected int serverTrackerTimeout;
|
||||
protected int rpcTimeout;
|
||||
protected int operationTimeout;
|
||||
protected long primaryCallTimeoutMicroseconds;
|
||||
final long pause;
|
||||
final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
|
||||
final int numTries;
|
||||
@VisibleForTesting
|
||||
int serverTrackerTimeout;
|
||||
final long primaryCallTimeoutMicroseconds;
|
||||
/** Whether to log details for batch errors */
|
||||
protected final boolean logBatchErrorDetails;
|
||||
final boolean logBatchErrorDetails;
|
||||
// End configuration settings.
|
||||
|
||||
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
|
||||
/**
|
||||
* The traffic control for requests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
final RequestController requestController;
|
||||
public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
|
||||
private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
|
||||
private final int periodToLog;
|
||||
AsyncProcess(ClusterConnection hc, Configuration conf,
|
||||
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
|
||||
RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) {
|
||||
RpcControllerFactory rpcFactory) {
|
||||
if (hc == null) {
|
||||
throw new IllegalArgumentException("ClusterConnection cannot be null.");
|
||||
}
|
||||
|
||||
this.connection = hc;
|
||||
this.pool = pool;
|
||||
this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
|
||||
|
||||
this.id = COUNTER.incrementAndGet();
|
||||
|
@ -249,42 +193,10 @@ class AsyncProcess {
|
|||
// how many times we could try in total, one more than retry number
|
||||
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
this.operationTimeout = operationTimeout;
|
||||
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
|
||||
|
||||
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
|
||||
this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
|
||||
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
|
||||
this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
|
||||
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
|
||||
this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
|
||||
this.startLogErrorsCnt =
|
||||
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
|
||||
if (this.maxTotalConcurrentTasks <= 0) {
|
||||
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
||||
}
|
||||
if (this.maxConcurrentTasksPerServer <= 0) {
|
||||
throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
|
||||
maxConcurrentTasksPerServer);
|
||||
}
|
||||
if (this.maxConcurrentTasksPerRegion <= 0) {
|
||||
throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
|
||||
maxConcurrentTasksPerRegion);
|
||||
}
|
||||
if (this.maxHeapSizePerRequest <= 0) {
|
||||
throw new IllegalArgumentException("maxHeapSizePerServer=" +
|
||||
maxHeapSizePerRequest);
|
||||
}
|
||||
|
||||
if (this.maxHeapSizeSubmit <= 0) {
|
||||
throw new IllegalArgumentException("maxHeapSizeSubmit=" +
|
||||
maxHeapSizeSubmit);
|
||||
}
|
||||
this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD);
|
||||
// Server tracker allows us to do faster, and yet useful (hopefully), retries.
|
||||
// However, if we are too useful, we might fail very quickly due to retry count limit.
|
||||
// To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
|
||||
|
@ -301,43 +213,30 @@ class AsyncProcess {
|
|||
this.rpcFactory = rpcFactory;
|
||||
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
|
||||
|
||||
this.thresholdToLogUndoneTaskDetails =
|
||||
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
|
||||
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
|
||||
this.requestController = RequestControllerFactory.create(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return pool if non null, otherwise returns this.pool if non null, otherwise throws
|
||||
* RuntimeException
|
||||
* The submitted task may be not accomplished at all if there are too many running tasks or
|
||||
* other limits.
|
||||
* @param <CResult> The class to cast the result
|
||||
* @param task The setting and data
|
||||
* @return AsyncRequestFuture
|
||||
*/
|
||||
protected ExecutorService getPool(ExecutorService pool) {
|
||||
if (pool != null) {
|
||||
return pool;
|
||||
public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException {
|
||||
AsyncRequestFuture reqFuture = checkTask(task);
|
||||
if (reqFuture != null) {
|
||||
return reqFuture;
|
||||
}
|
||||
if (this.pool != null) {
|
||||
return this.pool;
|
||||
SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows();
|
||||
switch (submittedRows) {
|
||||
case ALL:
|
||||
return submitAll(task);
|
||||
case AT_LEAST_ONE:
|
||||
return submit(task, true);
|
||||
default:
|
||||
return submit(task, false);
|
||||
}
|
||||
throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
|
||||
}
|
||||
|
||||
/**
|
||||
* See #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean).
|
||||
* Uses default ExecutorService for this AP (must have been created with one).
|
||||
*/
|
||||
public <CResult> AsyncRequestFuture submit(TableName tableName,
|
||||
final RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
|
||||
boolean needResults) throws InterruptedIOException {
|
||||
return submit(null, tableName, rows, atLeastOne, callback, needResults);
|
||||
}
|
||||
/**
|
||||
* See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
|
||||
* Uses the {@link ListRowAccess} to wrap the {@link List}.
|
||||
*/
|
||||
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
|
||||
List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
|
||||
boolean needResults) throws InterruptedIOException {
|
||||
return submit(pool, tableName, new ListRowAccess(rows), atLeastOne,
|
||||
callback, needResults);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -345,20 +244,13 @@ class AsyncProcess {
|
|||
* list. Does not send requests to replicas (not currently used for anything other
|
||||
* than streaming puts anyway).
|
||||
*
|
||||
* @param pool ExecutorService to use.
|
||||
* @param tableName The table for which this request is needed.
|
||||
* @param callback Batch callback. Only called on success (94 behavior).
|
||||
* @param needResults Whether results are needed, or can be discarded.
|
||||
* @param rows - the submitted row. Modified by the method: we remove the rows we took.
|
||||
* @param task The setting and data
|
||||
* @param atLeastOne true if we should submit at least a subset.
|
||||
*/
|
||||
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
|
||||
RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
|
||||
boolean needResults) throws InterruptedIOException {
|
||||
if (rows.isEmpty()) {
|
||||
return NO_REQS_RESULT;
|
||||
}
|
||||
|
||||
private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
|
||||
boolean atLeastOne) throws InterruptedIOException {
|
||||
TableName tableName = task.getTableName();
|
||||
RowAccess<? extends Row> rows = task.getRowAccess();
|
||||
Map<ServerName, MultiAction> actionsByServer =
|
||||
new HashMap<ServerName, MultiAction>();
|
||||
List<Action> retainedActions = new ArrayList<Action>(rows.size());
|
||||
|
@ -369,11 +261,11 @@ class AsyncProcess {
|
|||
// Location errors that happen before we decide what requests to take.
|
||||
List<Exception> locationErrors = null;
|
||||
List<Integer> locationErrorRows = null;
|
||||
RowCheckerHost checker = createRowCheckerHost();
|
||||
RequestController.Checker checker = requestController.newChecker();
|
||||
boolean firstIter = true;
|
||||
do {
|
||||
// Wait until there is at least one slot for a new task.
|
||||
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
|
||||
requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
|
||||
int posInList = -1;
|
||||
if (!firstIter) {
|
||||
checker.reset();
|
||||
|
@ -406,8 +298,7 @@ class AsyncProcess {
|
|||
it.remove();
|
||||
break; // Backward compat: we stop considering actions on location error.
|
||||
}
|
||||
long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0;
|
||||
ReturnCode code = checker.canTakeOperation(loc, rowSize);
|
||||
ReturnCode code = checker.canTakeRow(loc, r);
|
||||
if (code == ReturnCode.END) {
|
||||
break;
|
||||
}
|
||||
|
@ -426,29 +317,14 @@ class AsyncProcess {
|
|||
|
||||
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
|
||||
|
||||
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
|
||||
locationErrors, locationErrorRows, actionsByServer, pool);
|
||||
return submitMultiActions(task, retainedActions, nonceGroup,
|
||||
locationErrors, locationErrorRows, actionsByServer);
|
||||
}
|
||||
|
||||
private RowCheckerHost createRowCheckerHost() {
|
||||
return new RowCheckerHost(Arrays.asList(
|
||||
new TaskCountChecker(maxTotalConcurrentTasks,
|
||||
maxConcurrentTasksPerServer,
|
||||
maxConcurrentTasksPerRegion,
|
||||
tasksInProgress,
|
||||
taskCounterPerServer,
|
||||
taskCounterPerRegion)
|
||||
, new RequestSizeChecker(maxHeapSizePerRequest)
|
||||
, new SubmittedSizeChecker(maxHeapSizeSubmit)
|
||||
));
|
||||
}
|
||||
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
|
||||
List<Action> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
|
||||
Object[] results, boolean needResults, List<Exception> locationErrors,
|
||||
List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer,
|
||||
ExecutorService pool) {
|
||||
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
|
||||
tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
|
||||
<CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
|
||||
List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors,
|
||||
List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
|
||||
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup);
|
||||
// Add location errors if any
|
||||
if (locationErrors != null) {
|
||||
for (int i = 0; i < locationErrors.size(); ++i) {
|
||||
|
@ -462,14 +338,6 @@ class AsyncProcess {
|
|||
return ars;
|
||||
}
|
||||
|
||||
public void setRpcTimeout(int rpcTimeout) {
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
}
|
||||
|
||||
public void setOperationTimeout(int operationTimeout) {
|
||||
this.operationTimeout = operationTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper that is used when grouping the actions per region server.
|
||||
*
|
||||
|
@ -493,24 +361,13 @@ class AsyncProcess {
|
|||
multiAction.add(regionName, action);
|
||||
}
|
||||
|
||||
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
|
||||
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
|
||||
return submitAll(pool, tableName, rows, callback, results, null, -1);
|
||||
}
|
||||
/**
|
||||
* Submit immediately the list of rows, whatever the server status. Kept for backward
|
||||
* compatibility: it allows to be used with the batch interface that return an array of objects.
|
||||
*
|
||||
* @param pool ExecutorService to use.
|
||||
* @param tableName name of the table for which the submission is made.
|
||||
* @param rows the list of rows.
|
||||
* @param callback the callback.
|
||||
* @param results Optional array to return the results thru; backward compat.
|
||||
* @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting.
|
||||
* @param task The setting and data
|
||||
*/
|
||||
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
|
||||
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
|
||||
CancellableRegionServerCallable callable, int rpcTimeout) {
|
||||
private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) {
|
||||
RowAccess<? extends Row> rows = task.getRowAccess();
|
||||
List<Action> actions = new ArrayList<Action>(rows.size());
|
||||
|
||||
// The position will be used by the processBatch to match the object array returned.
|
||||
|
@ -528,93 +385,78 @@ class AsyncProcess {
|
|||
setNonce(ng, r, action);
|
||||
actions.add(action);
|
||||
}
|
||||
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
|
||||
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
|
||||
callable, rpcTimeout);
|
||||
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup());
|
||||
ars.groupAndSendMultiAction(actions, 1);
|
||||
return ars;
|
||||
}
|
||||
|
||||
private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) {
|
||||
if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
|
||||
return NO_REQS_RESULT;
|
||||
}
|
||||
Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
|
||||
checkOperationTimeout(task.getOperationTimeout());
|
||||
checkRpcTimeout(task.getRpcTimeout());
|
||||
return null;
|
||||
}
|
||||
|
||||
private void setNonce(NonceGenerator ng, Row r, Action action) {
|
||||
if (!(r instanceof Append) && !(r instanceof Increment)) return;
|
||||
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
|
||||
}
|
||||
|
||||
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
|
||||
TableName tableName, List<Action> actions, long nonceGroup, ExecutorService pool,
|
||||
Batch.Callback<CResult> callback, Object[] results, boolean needResults,
|
||||
CancellableRegionServerCallable callable, int rpcTimeout) {
|
||||
return new AsyncRequestFutureImpl<CResult>(
|
||||
tableName, actions, nonceGroup, getPool(pool), needResults,
|
||||
results, callback, callable, operationTimeout,
|
||||
rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this);
|
||||
private int checkTimeout(String name, int timeout) {
|
||||
if (timeout < 0) {
|
||||
throw new RuntimeException("The " + name + " must be bigger than zero,"
|
||||
+ "current value is" + timeout);
|
||||
}
|
||||
return timeout;
|
||||
}
|
||||
private int checkOperationTimeout(int operationTimeout) {
|
||||
return checkTimeout("operation timeout", operationTimeout);
|
||||
}
|
||||
|
||||
private int checkRpcTimeout(int rpcTimeout) {
|
||||
return checkTimeout("rpc timeout", rpcTimeout);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
<CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
|
||||
AsyncProcessTask task, List<Action> actions, long nonceGroup) {
|
||||
return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this);
|
||||
}
|
||||
|
||||
/** Wait until the async does not have more than max tasks in progress. */
|
||||
protected void waitForMaximumCurrentTasks(int max, String tableName)
|
||||
protected void waitForMaximumCurrentTasks(int max, TableName tableName)
|
||||
throws InterruptedIOException {
|
||||
waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
|
||||
requestController.waitForMaximumCurrentTasks(max, id, periodToLog,
|
||||
getLogger(tableName, max));
|
||||
}
|
||||
|
||||
// Break out this method so testable
|
||||
@VisibleForTesting
|
||||
void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
|
||||
String tableName) throws InterruptedIOException {
|
||||
long lastLog = EnvironmentEdgeManager.currentTime();
|
||||
long currentInProgress, oldInProgress = Long.MAX_VALUE;
|
||||
while ((currentInProgress = tasksInProgress.get()) > max) {
|
||||
if (oldInProgress != currentInProgress) { // Wait for in progress to change.
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
if (now > lastLog + 10000) {
|
||||
lastLog = now;
|
||||
LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
|
||||
+ max + ", tasksInProgress=" + currentInProgress +
|
||||
" hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
|
||||
if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
|
||||
logDetailsOfUndoneTasks(currentInProgress);
|
||||
}
|
||||
}
|
||||
}
|
||||
oldInProgress = currentInProgress;
|
||||
try {
|
||||
synchronized (tasksInProgress) {
|
||||
if (tasksInProgress.get() == oldInProgress) {
|
||||
tasksInProgress.wait(10);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("#" + id + ", interrupted." +
|
||||
" currentNumberOfTask=" + currentInProgress);
|
||||
}
|
||||
}
|
||||
private Consumer<Long> getLogger(TableName tableName, long max) {
|
||||
return (currentInProgress) -> {
|
||||
LOG.info("#" + id + (max < 0 ? ", waiting for any free slot"
|
||||
: ", waiting for some tasks to finish. Expected max="
|
||||
+ max) + ", tasksInProgress=" + currentInProgress +
|
||||
" hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
|
||||
};
|
||||
}
|
||||
|
||||
void logDetailsOfUndoneTasks(long taskInProgress) {
|
||||
ArrayList<ServerName> servers = new ArrayList<ServerName>();
|
||||
for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
|
||||
if (entry.getValue().get() > 0) {
|
||||
servers.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
|
||||
if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
|
||||
ArrayList<String> regions = new ArrayList<String>();
|
||||
for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
|
||||
if (entry.getValue().get() > 0) {
|
||||
regions.add(Bytes.toString(entry.getKey()));
|
||||
}
|
||||
}
|
||||
LOG.info("Regions against which left over task(s) are processed: " + regions);
|
||||
}
|
||||
void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||
requestController.incTaskCounters(regions, sn);
|
||||
}
|
||||
|
||||
|
||||
void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||
requestController.decTaskCounters(regions, sn);
|
||||
}
|
||||
/**
|
||||
* Only used w/useGlobalErrors ctor argument, for HTable backward compat.
|
||||
* @return Whether there were any errors in any request since the last time
|
||||
* {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created.
|
||||
*/
|
||||
public boolean hasError() {
|
||||
return globalErrors.hasErrors();
|
||||
return globalErrors != null && globalErrors.hasErrors();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -628,9 +470,9 @@ class AsyncProcess {
|
|||
* was called, or AP was created.
|
||||
*/
|
||||
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
|
||||
List<Row> failedRows, String tableName) throws InterruptedIOException {
|
||||
List<Row> failedRows, TableName tableName) throws InterruptedIOException {
|
||||
waitForMaximumCurrentTasks(0, tableName);
|
||||
if (!globalErrors.hasErrors()) {
|
||||
if (globalErrors == null || !globalErrors.hasErrors()) {
|
||||
return null;
|
||||
}
|
||||
if (failedRows != null) {
|
||||
|
@ -641,42 +483,13 @@ class AsyncProcess {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* increment the tasks counters for a given set of regions. MT safe.
|
||||
*/
|
||||
protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||
tasksInProgress.incrementAndGet();
|
||||
|
||||
computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
|
||||
|
||||
for (byte[] regBytes : regions) {
|
||||
computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrements the counters for a given region and the region server. MT Safe.
|
||||
*/
|
||||
protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||
for (byte[] regBytes : regions) {
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
|
||||
regionCnt.decrementAndGet();
|
||||
}
|
||||
|
||||
taskCounterPerServer.get(sn).decrementAndGet();
|
||||
tasksInProgress.decrementAndGet();
|
||||
synchronized (tasksInProgress) {
|
||||
tasksInProgress.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a caller. Isolated to be easily overridden in the tests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected RpcRetryingCaller<AbstractResponse> createCaller(
|
||||
CancellableRegionServerCallable callable, int rpcTimeout) {
|
||||
return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout);
|
||||
return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout));
|
||||
}
|
||||
|
||||
|
||||
|
@ -687,7 +500,7 @@ class AsyncProcess {
|
|||
* We may benefit from connection-wide tracking of server errors.
|
||||
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
|
||||
*/
|
||||
protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
|
||||
ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
|
||||
return new ConnectionImplementation.ServerErrorTracker(
|
||||
this.serverTrackerTimeout, this.numTries);
|
||||
}
|
||||
|
@ -696,283 +509,4 @@ class AsyncProcess {
|
|||
return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect all advices from checkers and make the final decision.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class RowCheckerHost {
|
||||
private final List<RowChecker> checkers;
|
||||
private boolean isEnd = false;
|
||||
RowCheckerHost(final List<RowChecker> checkers) {
|
||||
this.checkers = checkers;
|
||||
}
|
||||
void reset() throws InterruptedIOException {
|
||||
isEnd = false;
|
||||
InterruptedIOException e = null;
|
||||
for (RowChecker checker : checkers) {
|
||||
try {
|
||||
checker.reset();
|
||||
} catch (InterruptedIOException ex) {
|
||||
e = ex;
|
||||
}
|
||||
}
|
||||
if (e != null) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
|
||||
if (isEnd) {
|
||||
return ReturnCode.END;
|
||||
}
|
||||
ReturnCode code = ReturnCode.INCLUDE;
|
||||
for (RowChecker checker : checkers) {
|
||||
switch (checker.canTakeOperation(loc, rowSize)) {
|
||||
case END:
|
||||
isEnd = true;
|
||||
code = ReturnCode.END;
|
||||
break;
|
||||
case SKIP:
|
||||
code = ReturnCode.SKIP;
|
||||
break;
|
||||
case INCLUDE:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (code == ReturnCode.END) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (RowChecker checker : checkers) {
|
||||
checker.notifyFinal(code, loc, rowSize);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a way to control the flow of rows iteration.
|
||||
*/
|
||||
// Visible for Testing. Adding @VisibleForTesting here doesn't work for some reason.
|
||||
interface RowChecker {
|
||||
enum ReturnCode {
|
||||
/**
|
||||
* Accept current row.
|
||||
*/
|
||||
INCLUDE,
|
||||
/**
|
||||
* Skip current row.
|
||||
*/
|
||||
SKIP,
|
||||
/**
|
||||
* No more row can be included.
|
||||
*/
|
||||
END
|
||||
};
|
||||
ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
|
||||
/**
|
||||
* Add the final ReturnCode to the checker.
|
||||
* The ReturnCode may be reversed, so the checker need the final decision to update
|
||||
* the inner state.
|
||||
*/
|
||||
void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
|
||||
/**
|
||||
* Reset the inner state.
|
||||
*/
|
||||
void reset() throws InterruptedIOException ;
|
||||
}
|
||||
|
||||
/**
|
||||
* limit the heapsize of total submitted data.
|
||||
* Reduce the limit of heapsize for submitting quickly
|
||||
* if there is no running task.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class SubmittedSizeChecker implements RowChecker {
|
||||
private final long maxHeapSizeSubmit;
|
||||
private long heapSize = 0;
|
||||
SubmittedSizeChecker(final long maxHeapSizeSubmit) {
|
||||
this.maxHeapSizeSubmit = maxHeapSizeSubmit;
|
||||
}
|
||||
@Override
|
||||
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
|
||||
if (heapSize >= maxHeapSizeSubmit) {
|
||||
return ReturnCode.END;
|
||||
}
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
|
||||
if (code == ReturnCode.INCLUDE) {
|
||||
heapSize += rowSize;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
heapSize = 0;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* limit the max number of tasks in an AsyncProcess.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class TaskCountChecker implements RowChecker {
|
||||
private static final long MAX_WAITING_TIME = 1000; //ms
|
||||
private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
|
||||
private final Set<ServerName> serversIncluded = new HashSet<>();
|
||||
private final int maxConcurrentTasksPerRegion;
|
||||
private final int maxTotalConcurrentTasks;
|
||||
private final int maxConcurrentTasksPerServer;
|
||||
private final Map<byte[], AtomicInteger> taskCounterPerRegion;
|
||||
private final Map<ServerName, AtomicInteger> taskCounterPerServer;
|
||||
private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||
private final AtomicLong tasksInProgress;
|
||||
TaskCountChecker(final int maxTotalConcurrentTasks,
|
||||
final int maxConcurrentTasksPerServer,
|
||||
final int maxConcurrentTasksPerRegion,
|
||||
final AtomicLong tasksInProgress,
|
||||
final Map<ServerName, AtomicInteger> taskCounterPerServer,
|
||||
final Map<byte[], AtomicInteger> taskCounterPerRegion) {
|
||||
this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
|
||||
this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
|
||||
this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
|
||||
this.taskCounterPerRegion = taskCounterPerRegion;
|
||||
this.taskCounterPerServer = taskCounterPerServer;
|
||||
this.tasksInProgress = tasksInProgress;
|
||||
}
|
||||
@Override
|
||||
public void reset() throws InterruptedIOException {
|
||||
// prevent the busy-waiting
|
||||
waitForRegion();
|
||||
regionsIncluded.clear();
|
||||
serversIncluded.clear();
|
||||
busyRegions.clear();
|
||||
}
|
||||
private void waitForRegion() throws InterruptedIOException {
|
||||
if (busyRegions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
final long start = ee.currentTime();
|
||||
while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
|
||||
for (byte[] region : busyRegions) {
|
||||
AtomicInteger count = taskCounterPerRegion.get(region);
|
||||
if (count == null || count.get() < maxConcurrentTasksPerRegion) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
synchronized (tasksInProgress) {
|
||||
tasksInProgress.wait(10);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted." +
|
||||
" tasksInProgress=" + tasksInProgress);
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 1) check the regions is allowed.
|
||||
* 2) check the concurrent tasks for regions.
|
||||
* 3) check the total concurrent tasks.
|
||||
* 4) check the concurrent tasks for server.
|
||||
* @param loc
|
||||
* @param rowSize
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
|
||||
|
||||
HRegionInfo regionInfo = loc.getRegionInfo();
|
||||
if (regionsIncluded.contains(regionInfo)) {
|
||||
// We already know what to do with this region.
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
|
||||
if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
|
||||
// Too many tasks on this region already.
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
int newServers = serversIncluded.size()
|
||||
+ (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
|
||||
if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
|
||||
// Too many tasks.
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
|
||||
if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
|
||||
// Too many tasks for this individual server
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
|
||||
if (code == ReturnCode.INCLUDE) {
|
||||
regionsIncluded.add(loc.getRegionInfo());
|
||||
serversIncluded.add(loc.getServerName());
|
||||
}
|
||||
busyRegions.add(loc.getRegionInfo().getRegionName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* limit the request size for each regionserver.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class RequestSizeChecker implements RowChecker {
|
||||
private final long maxHeapSizePerRequest;
|
||||
private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
|
||||
RequestSizeChecker(final long maxHeapSizePerRequest) {
|
||||
this.maxHeapSizePerRequest = maxHeapSizePerRequest;
|
||||
}
|
||||
@Override
|
||||
public void reset() {
|
||||
serverRequestSizes.clear();
|
||||
}
|
||||
@Override
|
||||
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
|
||||
// Is it ok for limit of request size?
|
||||
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
|
||||
serverRequestSizes.get(loc.getServerName()) : 0L;
|
||||
// accept at least one request
|
||||
if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
|
||||
if (code == ReturnCode.INCLUDE) {
|
||||
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
|
||||
serverRequestSizes.get(loc.getServerName()) : 0L;
|
||||
serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class ListRowAccess<T> implements RowAccess<T> {
|
||||
private final List<T> data;
|
||||
ListRowAccess(final List<T> data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return data.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return data.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<T> iterator() {
|
||||
return data.iterator();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,229 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
|
||||
/**
|
||||
* Contains the attributes of a task which will be executed
|
||||
* by {@link org.apache.hadoop.hbase.client.AsyncProcess}.
|
||||
* The attributes will be validated by AsyncProcess.
|
||||
* It's intended for advanced client applications.
|
||||
* @param <T> The type of response from server-side
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class AsyncProcessTask<T> {
|
||||
/**
|
||||
* The number of processed rows.
|
||||
* The AsyncProcess has traffic control which may reject some rows.
|
||||
*/
|
||||
public enum SubmittedRows {
|
||||
ALL,
|
||||
AT_LEAST_ONE,
|
||||
NORMAL
|
||||
}
|
||||
public static <T> Builder<T> newBuilder(final Batch.Callback<T> callback) {
|
||||
return new Builder<>(callback);
|
||||
}
|
||||
public static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder<T> {
|
||||
|
||||
private ExecutorService pool;
|
||||
private TableName tableName;
|
||||
private RowAccess<? extends Row> rows;
|
||||
private SubmittedRows submittedRows = SubmittedRows.ALL;
|
||||
private Batch.Callback<T> callback;
|
||||
private boolean needResults;
|
||||
private int rpcTimeout;
|
||||
private int operationTimeout;
|
||||
private CancellableRegionServerCallable callable;
|
||||
private Object[] results;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
private Builder(Batch.Callback<T> callback) {
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
Builder<T> setResults(Object[] results) {
|
||||
this.results = results;
|
||||
if (results != null && results.length != 0) {
|
||||
setNeedResults(true);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setPool(ExecutorService pool) {
|
||||
this.pool = pool;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setRpcTimeout(int rpcTimeout) {
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setOperationTimeout(int operationTimeout) {
|
||||
this.operationTimeout = operationTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setTableName(TableName tableName) {
|
||||
this.tableName = tableName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setRowAccess(List<? extends Row> rows) {
|
||||
this.rows = new ListRowAccess<>(rows);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setRowAccess(RowAccess<? extends Row> rows) {
|
||||
this.rows = rows;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setSubmittedRows(SubmittedRows submittedRows) {
|
||||
this.submittedRows = submittedRows;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder<T> setNeedResults(boolean needResults) {
|
||||
this.needResults = needResults;
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder<T> setCallable(CancellableRegionServerCallable callable) {
|
||||
this.callable = callable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AsyncProcessTask<T> build() {
|
||||
return new AsyncProcessTask<>(pool, tableName, rows, submittedRows,
|
||||
callback, callable, needResults, rpcTimeout, operationTimeout, results);
|
||||
}
|
||||
}
|
||||
private final ExecutorService pool;
|
||||
private final TableName tableName;
|
||||
private final RowAccess<? extends Row> rows;
|
||||
private final SubmittedRows submittedRows;
|
||||
private final Batch.Callback<T> callback;
|
||||
private final CancellableRegionServerCallable callable;
|
||||
private final boolean needResults;
|
||||
private final int rpcTimeout;
|
||||
private final int operationTimeout;
|
||||
private final Object[] results;
|
||||
AsyncProcessTask(AsyncProcessTask<T> task) {
|
||||
this(task.getPool(), task.getTableName(), task.getRowAccess(),
|
||||
task.getSubmittedRows(), task.getCallback(), task.getCallable(),
|
||||
task.getNeedResults(), task.getRpcTimeout(), task.getOperationTimeout(),
|
||||
task.getResults());
|
||||
}
|
||||
AsyncProcessTask(ExecutorService pool, TableName tableName,
|
||||
RowAccess<? extends Row> rows, SubmittedRows size, Batch.Callback<T> callback,
|
||||
CancellableRegionServerCallable callable, boolean needResults,
|
||||
int rpcTimeout, int operationTimeout, Object[] results) {
|
||||
this.pool = pool;
|
||||
this.tableName = tableName;
|
||||
this.rows = rows;
|
||||
this.submittedRows = size;
|
||||
this.callback = callback;
|
||||
this.callable = callable;
|
||||
this.needResults = needResults;
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
this.operationTimeout = operationTimeout;
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
public int getOperationTimeout() {
|
||||
return operationTimeout;
|
||||
}
|
||||
|
||||
public ExecutorService getPool() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
public TableName getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public RowAccess<? extends Row> getRowAccess() {
|
||||
return rows;
|
||||
}
|
||||
|
||||
public SubmittedRows getSubmittedRows() {
|
||||
return submittedRows;
|
||||
}
|
||||
|
||||
public Batch.Callback<T> getCallback() {
|
||||
return callback;
|
||||
}
|
||||
|
||||
CancellableRegionServerCallable getCallable() {
|
||||
return callable;
|
||||
}
|
||||
|
||||
Object[] getResults() {
|
||||
return results;
|
||||
}
|
||||
|
||||
public boolean getNeedResults() {
|
||||
return needResults;
|
||||
}
|
||||
|
||||
public int getRpcTimeout() {
|
||||
return rpcTimeout;
|
||||
}
|
||||
|
||||
static class ListRowAccess<T> implements RowAccess<T> {
|
||||
|
||||
private final List<T> data;
|
||||
|
||||
ListRowAccess(final List<T> data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return data.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return data.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<T> iterator() {
|
||||
return data.iterator();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -300,11 +300,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
private final int[] replicaGetIndices;
|
||||
private final boolean hasAnyReplicaGets;
|
||||
private final long nonceGroup;
|
||||
private CancellableRegionServerCallable currentCallable;
|
||||
private int operationTimeout;
|
||||
private int rpcTimeout;
|
||||
private final CancellableRegionServerCallable currentCallable;
|
||||
private final int operationTimeout;
|
||||
private final int rpcTimeout;
|
||||
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
|
||||
protected AsyncProcess asyncProcess;
|
||||
private final AsyncProcess asyncProcess;
|
||||
|
||||
/**
|
||||
* For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
|
||||
|
@ -339,32 +339,27 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public AsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup,
|
||||
ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback,
|
||||
CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
|
||||
AsyncProcess asyncProcess) {
|
||||
this.pool = pool;
|
||||
this.callback = callback;
|
||||
public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
|
||||
long nonceGroup, AsyncProcess asyncProcess) {
|
||||
this.pool = task.getPool();
|
||||
this.callback = task.getCallback();
|
||||
this.nonceGroup = nonceGroup;
|
||||
this.tableName = tableName;
|
||||
this.tableName = task.getTableName();
|
||||
this.actionsInProgress.set(actions.size());
|
||||
if (results != null) {
|
||||
assert needResults;
|
||||
if (results.length != actions.size()) {
|
||||
if (task.getResults() == null) {
|
||||
results = task.getNeedResults() ? new Object[actions.size()] : null;
|
||||
} else {
|
||||
if (task.getResults().length != actions.size()) {
|
||||
throw new AssertionError("results.length");
|
||||
}
|
||||
this.results = results;
|
||||
this.results = task.getResults();
|
||||
for (int i = 0; i != this.results.length; ++i) {
|
||||
results[i] = null;
|
||||
}
|
||||
} else {
|
||||
this.results = needResults ? new Object[actions.size()] : null;
|
||||
}
|
||||
List<Integer> replicaGetIndices = null;
|
||||
boolean hasAnyReplicaGets = false;
|
||||
if (needResults) {
|
||||
if (results != null) {
|
||||
// Check to see if any requests might require replica calls.
|
||||
// We expect that many requests will consist of all or no multi-replica gets; in such
|
||||
// cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
|
||||
|
@ -414,10 +409,10 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
this.errorsByServer = createServerErrorTracker();
|
||||
this.errors = (asyncProcess.globalErrors != null)
|
||||
? asyncProcess.globalErrors : new BatchErrors();
|
||||
this.operationTimeout = operationTimeout;
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
this.currentCallable = callable;
|
||||
if (callable == null) {
|
||||
this.operationTimeout = task.getOperationTimeout();
|
||||
this.rpcTimeout = task.getRpcTimeout();
|
||||
this.currentCallable = task.getCallable();
|
||||
if (task.getCallable() == null) {
|
||||
tracker = new RetryingTimeTracker().start();
|
||||
}
|
||||
}
|
||||
|
@ -1246,9 +1241,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
lastLog = now;
|
||||
LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
|
||||
+ " actions to finish on table: " + tableName);
|
||||
if (currentInProgress <= asyncProcess.thresholdToLogUndoneTaskDetails) {
|
||||
asyncProcess.logDetailsOfUndoneTasks(currentInProgress);
|
||||
}
|
||||
}
|
||||
}
|
||||
synchronized (actionsInProgress) {
|
||||
|
|
|
@ -19,12 +19,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Collections;
|
||||
|
@ -36,6 +33,8 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -67,61 +66,70 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
"hbase.client.bufferedmutator.classname";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
|
||||
|
||||
|
||||
private final ExceptionListener listener;
|
||||
|
||||
protected ClusterConnection connection; // non-final so can be overridden in test
|
||||
private final TableName tableName;
|
||||
private volatile Configuration conf;
|
||||
|
||||
@VisibleForTesting
|
||||
final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
|
||||
@VisibleForTesting
|
||||
AtomicLong currentWriteBufferSize = new AtomicLong(0);
|
||||
|
||||
private final Configuration conf;
|
||||
private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>();
|
||||
private final AtomicLong currentWriteBufferSize = new AtomicLong(0);
|
||||
/**
|
||||
* Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
|
||||
* The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
AtomicInteger undealtMutationCount = new AtomicInteger(0);
|
||||
private long writeBufferSize;
|
||||
private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
|
||||
private volatile long writeBufferSize;
|
||||
private final int maxKeyValueSize;
|
||||
private boolean closed = false;
|
||||
private final ExecutorService pool;
|
||||
private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
|
||||
private int operationTimeout;
|
||||
private final AtomicInteger rpcTimeout;
|
||||
private final AtomicInteger operationTimeout;
|
||||
private final boolean cleanupPoolOnClose;
|
||||
private volatile boolean closed = false;
|
||||
private final AsyncProcess ap;
|
||||
|
||||
@VisibleForTesting
|
||||
protected AsyncProcess ap; // non-final so can be overridden in test
|
||||
|
||||
BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
|
||||
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
|
||||
BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
|
||||
if (conn == null || conn.isClosed()) {
|
||||
throw new IllegalArgumentException("Connection is null or closed.");
|
||||
}
|
||||
|
||||
this.tableName = params.getTableName();
|
||||
this.connection = conn;
|
||||
this.conf = connection.getConfiguration();
|
||||
this.pool = params.getPool();
|
||||
this.conf = conn.getConfiguration();
|
||||
this.listener = params.getListener();
|
||||
|
||||
if (params.getPool() == null) {
|
||||
this.pool = HTable.getDefaultExecutor(conf);
|
||||
cleanupPoolOnClose = true;
|
||||
} else {
|
||||
this.pool = params.getPool();
|
||||
cleanupPoolOnClose = false;
|
||||
}
|
||||
ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
|
||||
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
|
||||
params.getWriteBufferSize() : tableConf.getWriteBufferSize();
|
||||
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
|
||||
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
|
||||
|
||||
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.operationTimeout = conn.getConfiguration().getInt(
|
||||
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
// puts need to track errors globally due to how the APIs currently work.
|
||||
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory,
|
||||
writeRpcTimeout, operationTimeout);
|
||||
this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ?
|
||||
params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
|
||||
this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ?
|
||||
params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
|
||||
this.ap = ap;
|
||||
}
|
||||
BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
|
||||
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
|
||||
this(conn, params,
|
||||
// puts need to track errors globally due to how the APIs currently work.
|
||||
new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ExecutorService getPool() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
AsyncProcess getAsyncProcess() {
|
||||
return ap;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -193,22 +201,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
// As we can have an operation in progress even if the buffer is empty, we call
|
||||
// backgroundFlushCommits at least one time.
|
||||
backgroundFlushCommits(true);
|
||||
this.pool.shutdown();
|
||||
boolean terminated;
|
||||
int loopCnt = 0;
|
||||
do {
|
||||
// wait until the pool has terminated
|
||||
terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
|
||||
loopCnt += 1;
|
||||
if (loopCnt >= 10) {
|
||||
LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
|
||||
break;
|
||||
}
|
||||
} while (!terminated);
|
||||
|
||||
if (cleanupPoolOnClose) {
|
||||
this.pool.shutdown();
|
||||
boolean terminated;
|
||||
int loopCnt = 0;
|
||||
do {
|
||||
// wait until the pool has terminated
|
||||
terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
|
||||
loopCnt += 1;
|
||||
if (loopCnt >= 10) {
|
||||
LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
|
||||
break;
|
||||
}
|
||||
} while (!terminated);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("waitForTermination interrupted");
|
||||
|
||||
} finally {
|
||||
this.closed = true;
|
||||
}
|
||||
|
@ -239,8 +247,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
|
||||
if (!synchronous) {
|
||||
QueueRowAccess taker = new QueueRowAccess();
|
||||
AsyncProcessTask task = wrapAsyncProcessTask(taker);
|
||||
try {
|
||||
ap.submit(tableName, taker, true, null, false);
|
||||
ap.submit(task);
|
||||
if (ap.hasError()) {
|
||||
LOG.debug(tableName + ": One or more of the operations have failed -"
|
||||
+ " waiting for all operation in progress to finish (successfully or not)");
|
||||
|
@ -251,17 +260,17 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
}
|
||||
if (synchronous || ap.hasError()) {
|
||||
QueueRowAccess taker = new QueueRowAccess();
|
||||
AsyncProcessTask task = wrapAsyncProcessTask(taker);
|
||||
try {
|
||||
while (!taker.isEmpty()) {
|
||||
ap.submit(tableName, taker, true, null, false);
|
||||
ap.submit(task);
|
||||
taker.reset();
|
||||
}
|
||||
} finally {
|
||||
taker.restoreRemainder();
|
||||
}
|
||||
|
||||
RetriesExhaustedWithDetailsException error =
|
||||
ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
|
||||
ap.waitForAllPreviousOpsAndReset(null, tableName);
|
||||
if (error != null) {
|
||||
if (listener == null) {
|
||||
throw error;
|
||||
|
@ -272,9 +281,39 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
|
||||
* @param taker access the inner buffer.
|
||||
* @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
|
||||
*/
|
||||
private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) {
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(taker)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
|
||||
.build();
|
||||
return new AsyncProcessTask(task) {
|
||||
@Override
|
||||
public int getRpcTimeout() {
|
||||
return rpcTimeout.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOperationTimeout() {
|
||||
return operationTimeout.get();
|
||||
}
|
||||
};
|
||||
}
|
||||
/**
|
||||
* This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
|
||||
* not be called for production uses.
|
||||
* If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()}
|
||||
* will be called.
|
||||
* @param writeBufferSize The max size of internal buffer where data is stored.
|
||||
* @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException
|
||||
* if an I/O error occurs and there are too many retries.
|
||||
* @throws java.io.InterruptedIOException if the I/O task is interrupted.
|
||||
* @deprecated Going away when we drop public support for {@link HTable}.
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -295,15 +334,23 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setRpcTimeout(int timeout) {
|
||||
this.writeRpcTimeout = timeout;
|
||||
ap.setRpcTimeout(timeout);
|
||||
public void setRpcTimeout(int rpcTimeout) {
|
||||
this.rpcTimeout.set(rpcTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOperationTimeout(int timeout) {
|
||||
this.operationTimeout = timeout;
|
||||
ap.setOperationTimeout(operationTimeout);
|
||||
public void setOperationTimeout(int operationTimeout) {
|
||||
this.operationTimeout.set(operationTimeout);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getCurrentWriteBufferSize() {
|
||||
return currentWriteBufferSize.get();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int size() {
|
||||
return undealtMutationCount.get();
|
||||
}
|
||||
|
||||
private class QueueRowAccess implements RowAccess<Row> {
|
||||
|
|
|
@ -39,7 +39,8 @@ public class BufferedMutatorParams implements Cloneable {
|
|||
private int maxKeyValueSize = UNSET;
|
||||
private ExecutorService pool = null;
|
||||
private String implementationClassName = null;
|
||||
|
||||
private int rpcTimeout = UNSET;
|
||||
private int operationTimeout = UNSET;
|
||||
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
|
||||
@Override
|
||||
public void onException(RetriesExhaustedWithDetailsException exception,
|
||||
|
@ -61,6 +62,24 @@ public class BufferedMutatorParams implements Cloneable {
|
|||
return writeBufferSize;
|
||||
}
|
||||
|
||||
public BufferedMutatorParams rpcTimeout(final int rpcTimeout) {
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getRpcTimeout() {
|
||||
return rpcTimeout;
|
||||
}
|
||||
|
||||
public BufferedMutatorParams opertationTimeout(final int operationTimeout) {
|
||||
this.operationTimeout = operationTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getOperationTimeout() {
|
||||
return operationTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the write buffer size specified by the provided {@link Connection}'s
|
||||
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
|
||||
|
|
|
@ -42,7 +42,8 @@ public class ConnectionConfiguration {
|
|||
private final int replicaCallTimeoutMicroSecondScan;
|
||||
private final int retries;
|
||||
private final int maxKeyValueSize;
|
||||
|
||||
private final int readRpcTimeout;
|
||||
private final int writeRpcTimeout;
|
||||
// toggle for async/sync prefetch
|
||||
private final boolean clientScannerAsyncPrefetch;
|
||||
|
||||
|
@ -80,6 +81,12 @@ public class ConnectionConfiguration {
|
|||
Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH);
|
||||
|
||||
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
|
||||
|
||||
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
|
||||
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
|
||||
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -99,6 +106,16 @@ public class ConnectionConfiguration {
|
|||
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
|
||||
this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
|
||||
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
|
||||
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
}
|
||||
|
||||
public int getReadRpcTimeout() {
|
||||
return readRpcTimeout;
|
||||
}
|
||||
|
||||
public int getWriteRpcTimeout() {
|
||||
return writeRpcTimeout;
|
||||
}
|
||||
|
||||
public long getWriteBufferSize() {
|
||||
|
|
|
@ -249,7 +249,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
|
||||
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
|
||||
this.asyncProcess = createAsyncProcess(this.conf);
|
||||
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory);
|
||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||
this.metrics = new MetricsConnection(this);
|
||||
} else {
|
||||
|
@ -1833,17 +1833,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
metaCache.clearCache(regionInfo);
|
||||
}
|
||||
|
||||
// For tests to override.
|
||||
protected AsyncProcess createAsyncProcess(Configuration conf) {
|
||||
// No default pool available.
|
||||
int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||
int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
|
||||
rpcTimeout, operationTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncProcess getAsyncProcess() {
|
||||
return asyncProcess;
|
||||
|
|
|
@ -103,27 +103,28 @@ import org.apache.hadoop.hbase.util.Threads;
|
|||
@InterfaceStability.Stable
|
||||
public class HTable implements Table {
|
||||
private static final Log LOG = LogFactory.getLog(HTable.class);
|
||||
protected ClusterConnection connection;
|
||||
private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
|
||||
private final ClusterConnection connection;
|
||||
private final TableName tableName;
|
||||
private volatile Configuration configuration;
|
||||
private ConnectionConfiguration connConfiguration;
|
||||
protected BufferedMutatorImpl mutator;
|
||||
private final Configuration configuration;
|
||||
private final ConnectionConfiguration connConfiguration;
|
||||
@VisibleForTesting
|
||||
BufferedMutatorImpl mutator;
|
||||
private boolean closed = false;
|
||||
protected int scannerCaching;
|
||||
protected long scannerMaxResultSize;
|
||||
private ExecutorService pool; // For Multi & Scan
|
||||
private final int scannerCaching;
|
||||
private final long scannerMaxResultSize;
|
||||
private final ExecutorService pool; // For Multi & Scan
|
||||
private int operationTimeout; // global timeout for each blocking method with retrying rpc
|
||||
private int readRpcTimeout; // timeout for each read rpc request
|
||||
private int writeRpcTimeout; // timeout for each write rpc request
|
||||
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
|
||||
private final boolean cleanupConnectionOnClose; // close the connection in close()
|
||||
private Consistency defaultConsistency = Consistency.STRONG;
|
||||
private HRegionLocator locator;
|
||||
private final HRegionLocator locator;
|
||||
|
||||
/** The Async process for batch */
|
||||
protected AsyncProcess multiAp;
|
||||
private RpcRetryingCallerFactory rpcCallerFactory;
|
||||
private RpcControllerFactory rpcControllerFactory;
|
||||
@VisibleForTesting
|
||||
AsyncProcess multiAp;
|
||||
private final RpcRetryingCallerFactory rpcCallerFactory;
|
||||
private final RpcControllerFactory rpcControllerFactory;
|
||||
|
||||
// Marked Private @since 1.0
|
||||
@InterfaceAudience.Private
|
||||
|
@ -167,22 +168,42 @@ public class HTable implements Table {
|
|||
throw new IllegalArgumentException("Given table name is null");
|
||||
}
|
||||
this.tableName = tableName;
|
||||
this.cleanupConnectionOnClose = false;
|
||||
this.connection = connection;
|
||||
this.configuration = connection.getConfiguration();
|
||||
this.connConfiguration = tableConfig;
|
||||
this.pool = pool;
|
||||
if (tableConfig == null) {
|
||||
connConfiguration = new ConnectionConfiguration(configuration);
|
||||
} else {
|
||||
connConfiguration = tableConfig;
|
||||
}
|
||||
if (pool == null) {
|
||||
this.pool = getDefaultExecutor(this.configuration);
|
||||
this.cleanupPoolOnClose = true;
|
||||
} else {
|
||||
this.pool = pool;
|
||||
this.cleanupPoolOnClose = false;
|
||||
}
|
||||
if (rpcCallerFactory == null) {
|
||||
this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
|
||||
} else {
|
||||
this.rpcCallerFactory = rpcCallerFactory;
|
||||
}
|
||||
|
||||
this.rpcCallerFactory = rpcCallerFactory;
|
||||
this.rpcControllerFactory = rpcControllerFactory;
|
||||
if (rpcControllerFactory == null) {
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
|
||||
} else {
|
||||
this.rpcControllerFactory = rpcControllerFactory;
|
||||
}
|
||||
|
||||
this.finishSetup();
|
||||
this.operationTimeout = tableName.isSystemTable() ?
|
||||
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
||||
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
|
||||
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
|
||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||
|
||||
// puts need to track errors globally due to how the APIs currently work.
|
||||
multiAp = this.connection.getAsyncProcess();
|
||||
this.locator = new HRegionLocator(tableName, connection);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -190,20 +211,23 @@ public class HTable implements Table {
|
|||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
|
||||
protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException {
|
||||
connection = conn;
|
||||
tableName = params.getTableName();
|
||||
connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
|
||||
this.tableName = mutator.getName();
|
||||
this.configuration = connection.getConfiguration();
|
||||
connConfiguration = new ConnectionConfiguration(configuration);
|
||||
cleanupPoolOnClose = false;
|
||||
cleanupConnectionOnClose = false;
|
||||
// used from tests, don't trust the connection is real
|
||||
this.mutator = new BufferedMutatorImpl(conn, null, null, params);
|
||||
this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
|
||||
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.mutator = mutator;
|
||||
this.operationTimeout = tableName.isSystemTable() ?
|
||||
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
||||
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
|
||||
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
|
||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||
this.rpcControllerFactory = null;
|
||||
this.rpcCallerFactory = null;
|
||||
this.pool = mutator.getPool();
|
||||
this.locator = null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -213,36 +237,6 @@ public class HTable implements Table {
|
|||
return conf.getInt("hbase.client.keyvalue.maxsize", -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* setup this HTable's parameter based on the passed configuration
|
||||
*/
|
||||
private void finishSetup() throws IOException {
|
||||
if (connConfiguration == null) {
|
||||
connConfiguration = new ConnectionConfiguration(configuration);
|
||||
}
|
||||
|
||||
this.operationTimeout = tableName.isSystemTable() ?
|
||||
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
||||
this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
|
||||
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||
if (this.rpcCallerFactory == null) {
|
||||
this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
|
||||
}
|
||||
if (this.rpcControllerFactory == null) {
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
|
||||
}
|
||||
|
||||
// puts need to track errors globally due to how the APIs currently work.
|
||||
multiAp = this.connection.getAsyncProcess();
|
||||
this.locator = new HRegionLocator(getName(), connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
@ -423,7 +417,7 @@ public class HTable implements Table {
|
|||
get = ReflectionUtils.newInstance(get.getClass(), get);
|
||||
get.setCheckExistenceOnly(checkExistenceOnly);
|
||||
if (get.getConsistency() == null){
|
||||
get.setConsistency(defaultConsistency);
|
||||
get.setConsistency(DEFAULT_CONSISTENCY);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -483,13 +477,37 @@ public class HTable implements Table {
|
|||
@Override
|
||||
public void batch(final List<? extends Row> actions, final Object[] results)
|
||||
throws InterruptedException, IOException {
|
||||
batch(actions, results, -1);
|
||||
int rpcTimeout = writeRpcTimeout;
|
||||
boolean hasRead = false;
|
||||
boolean hasWrite = false;
|
||||
for (Row action : actions) {
|
||||
if (action instanceof Mutation) {
|
||||
hasWrite = true;
|
||||
} else {
|
||||
hasRead = true;
|
||||
}
|
||||
if (hasRead && hasWrite) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (hasRead && !hasWrite) {
|
||||
rpcTimeout = readRpcTimeout;
|
||||
}
|
||||
batch(actions, results, rpcTimeout);
|
||||
}
|
||||
|
||||
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
|
||||
throws InterruptedException, IOException {
|
||||
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
|
||||
rpcTimeout);
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(actions)
|
||||
.setResults(results)
|
||||
.setRpcTimeout(rpcTimeout)
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture ars = multiAp.submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
|
@ -509,8 +527,20 @@ public class HTable implements Table {
|
|||
public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
|
||||
Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
|
||||
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
|
||||
AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
|
||||
pool, tableName, actions, callback, results);
|
||||
int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
|
||||
int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(actions)
|
||||
.setResults(results)
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setRpcTimeout(writeTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
|
@ -536,8 +566,16 @@ public class HTable implements Table {
|
|||
}
|
||||
};
|
||||
List<Delete> rows = Collections.singletonList(delete);
|
||||
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
|
||||
null, null, callable, writeRpcTimeout);
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(rows)
|
||||
.setCallable(callable)
|
||||
.setRpcTimeout(writeRpcTimeout)
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture ars = multiAp.submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
|
@ -615,8 +653,16 @@ public class HTable implements Table {
|
|||
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
|
||||
}
|
||||
};
|
||||
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
|
||||
null, null, callable, writeRpcTimeout);
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(rm.getMutations())
|
||||
.setCallable(callable)
|
||||
.setRpcTimeout(writeRpcTimeout)
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture ars = multiAp.submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
|
@ -795,8 +841,18 @@ public class HTable implements Table {
|
|||
};
|
||||
List<Delete> rows = Collections.singletonList(delete);
|
||||
Object[] results = new Object[1];
|
||||
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
|
||||
null, results, callable, -1);
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(rows)
|
||||
.setCallable(callable)
|
||||
// TODO any better timeout?
|
||||
.setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout))
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.setResults(results)
|
||||
.build();
|
||||
AsyncRequestFuture ars = multiAp.submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
|
@ -839,8 +895,18 @@ public class HTable implements Table {
|
|||
* It is excessive to send such a large array, but that is required by the framework right now
|
||||
* */
|
||||
Object[] results = new Object[rm.getMutations().size()];
|
||||
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
|
||||
null, results, callable, -1);
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(rm.getMutations())
|
||||
.setResults(results)
|
||||
.setCallable(callable)
|
||||
// TODO any better timeout?
|
||||
.setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout))
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture ars = multiAp.submit(task);
|
||||
ars.waitUntilDone();
|
||||
if (ars.hasError()) {
|
||||
throw ars.getErrors();
|
||||
|
@ -926,6 +992,10 @@ public class HTable implements Table {
|
|||
return;
|
||||
}
|
||||
flushCommits();
|
||||
if (mutator != null) {
|
||||
mutator.close();
|
||||
mutator = null;
|
||||
}
|
||||
if (cleanupPoolOnClose) {
|
||||
this.pool.shutdown();
|
||||
try {
|
||||
|
@ -939,11 +1009,6 @@ public class HTable implements Table {
|
|||
LOG.warn("waitForTermination interrupted");
|
||||
}
|
||||
}
|
||||
if (cleanupConnectionOnClose) {
|
||||
if (this.connection != null) {
|
||||
this.connection.close();
|
||||
}
|
||||
}
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
|
@ -1102,7 +1167,6 @@ public class HTable implements Table {
|
|||
if (mutator != null) {
|
||||
mutator.setOperationTimeout(operationTimeout);
|
||||
}
|
||||
multiAp.setOperationTimeout(operationTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1134,7 +1198,6 @@ public class HTable implements Table {
|
|||
if (mutator != null) {
|
||||
mutator.setRpcTimeout(writeRpcTimeout);
|
||||
}
|
||||
multiAp.setRpcTimeout(writeRpcTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1217,37 +1280,41 @@ public class HTable implements Table {
|
|||
Object[] results = new Object[execs.size()];
|
||||
|
||||
AsyncProcess asyncProcess =
|
||||
new AsyncProcess(connection, configuration, pool,
|
||||
new AsyncProcess(connection, configuration,
|
||||
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
|
||||
true, RpcControllerFactory.instantiate(configuration), readRpcTimeout,
|
||||
operationTimeout);
|
||||
|
||||
AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
|
||||
new Callback<ClientProtos.CoprocessorServiceResult>() {
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row,
|
||||
ClientProtos.CoprocessorServiceResult serviceResult) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
|
||||
": region=" + Bytes.toStringBinary(region) +
|
||||
", row=" + Bytes.toStringBinary(row) +
|
||||
", value=" + serviceResult.getValue().getValue());
|
||||
}
|
||||
try {
|
||||
Message.Builder builder = responsePrototype.newBuilderForType();
|
||||
org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
|
||||
serviceResult.getValue().getValue().toByteArray());
|
||||
callback.update(region, row, (R) builder.build());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
|
||||
e);
|
||||
callbackErrorExceptions.add(e);
|
||||
callbackErrorActions.add(execsByRow.get(row));
|
||||
callbackErrorServers.add("null");
|
||||
}
|
||||
}
|
||||
}, results);
|
||||
true, RpcControllerFactory.instantiate(configuration));
|
||||
|
||||
Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
|
||||
= (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
|
||||
": region=" + Bytes.toStringBinary(region) +
|
||||
", row=" + Bytes.toStringBinary(row) +
|
||||
", value=" + serviceResult.getValue().getValue());
|
||||
}
|
||||
try {
|
||||
Message.Builder builder = responsePrototype.newBuilderForType();
|
||||
org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
|
||||
serviceResult.getValue().getValue().toByteArray());
|
||||
callback.update(region, row, (R) builder.build());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
|
||||
e);
|
||||
callbackErrorExceptions.add(e);
|
||||
callbackErrorActions.add(execsByRow.get(row));
|
||||
callbackErrorServers.add("null");
|
||||
}
|
||||
};
|
||||
AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = AsyncProcessTask.newBuilder(resultsCallback)
|
||||
.setPool(pool)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(execs)
|
||||
.setResults(results)
|
||||
.setRpcTimeout(readRpcTimeout)
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
||||
.build();
|
||||
AsyncRequestFuture future = asyncProcess.submit(task);
|
||||
future.waitUntilDone();
|
||||
|
||||
if (future.hasError()) {
|
||||
|
@ -1270,10 +1337,10 @@ public class HTable implements Table {
|
|||
.pool(pool)
|
||||
.writeBufferSize(connConfiguration.getWriteBufferSize())
|
||||
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
|
||||
.opertationTimeout(operationTimeout)
|
||||
.rpcTimeout(writeRpcTimeout)
|
||||
);
|
||||
}
|
||||
mutator.setRpcTimeout(writeRpcTimeout);
|
||||
mutator.setOperationTimeout(operationTimeout);
|
||||
return mutator;
|
||||
}
|
||||
}
|
|
@ -443,7 +443,7 @@ public class HTableMultiplexer {
|
|||
private final AtomicInteger retryInQueue = new AtomicInteger(0);
|
||||
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
|
||||
private final int operationTimeout;
|
||||
|
||||
private final ExecutorService pool;
|
||||
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
|
||||
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
|
||||
ExecutorService pool, ScheduledExecutorService executor) {
|
||||
|
@ -457,10 +457,10 @@ public class HTableMultiplexer {
|
|||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
|
||||
writeRpcTimeout, operationTimeout);
|
||||
this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
|
||||
this.executor = executor;
|
||||
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
protected LinkedBlockingQueue<PutStatus> getQueue() {
|
||||
|
@ -594,9 +594,14 @@ public class HTableMultiplexer {
|
|||
Map<ServerName, MultiAction> actionsByServer =
|
||||
Collections.singletonMap(server, actions);
|
||||
try {
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setResults(results)
|
||||
.setPool(pool)
|
||||
.setRpcTimeout(writeRpcTimeout)
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.build();
|
||||
AsyncRequestFuture arf =
|
||||
ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
|
||||
null, actionsByServer, null);
|
||||
ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
|
||||
arf.waitUntilDone();
|
||||
if (arf.hasError()) {
|
||||
// We just log and ignore the exception here since failed Puts will be resubmit again.
|
||||
|
|
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Collection;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* An interface for client request scheduling algorithm.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public interface RequestController {
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public enum ReturnCode {
|
||||
/**
|
||||
* Accept current row.
|
||||
*/
|
||||
INCLUDE,
|
||||
/**
|
||||
* Skip current row.
|
||||
*/
|
||||
SKIP,
|
||||
/**
|
||||
* No more row can be included.
|
||||
*/
|
||||
END
|
||||
}
|
||||
|
||||
/**
|
||||
* Picks up the valid data.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public interface Checker {
|
||||
/**
|
||||
* Checks the data whether it is valid to submit.
|
||||
* @param loc the destination of data
|
||||
* @param row the data to check
|
||||
* @return describe the decision for the row
|
||||
*/
|
||||
ReturnCode canTakeRow(HRegionLocation loc, Row row);
|
||||
|
||||
/**
|
||||
* Reset the state of the scheduler when completing the iteration of rows.
|
||||
* @throws InterruptedIOException some controller may wait
|
||||
* for some busy region or RS to complete the undealt request.
|
||||
*/
|
||||
void reset() throws InterruptedIOException ;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A new checker for evaluating a batch rows.
|
||||
*/
|
||||
Checker newChecker();
|
||||
|
||||
/**
|
||||
* Increment the counter if we build a valid task.
|
||||
* @param regions The destination of task
|
||||
* @param sn The target server
|
||||
*/
|
||||
void incTaskCounters(Collection<byte[]> regions, ServerName sn);
|
||||
|
||||
/**
|
||||
* Decrement the counter if a task is accomplished.
|
||||
* @param regions The destination of task
|
||||
* @param sn The target server
|
||||
*/
|
||||
void decTaskCounters(Collection<byte[]> regions, ServerName sn);
|
||||
|
||||
/**
|
||||
* @return The number of running task.
|
||||
*/
|
||||
long getNumberOfTsksInProgress();
|
||||
|
||||
/**
|
||||
* Waits for the running tasks to complete.
|
||||
* If there are specified threshold and trigger, the implementation should
|
||||
* wake up once in a while for checking the threshold and calling trigger.
|
||||
* @param max This method will return if the number of running tasks is
|
||||
* less than or equal to max.
|
||||
* @param id the caller's id
|
||||
* @param periodToTrigger The period to invoke the trigger. This value is a
|
||||
* hint. The real period depends on the implementation.
|
||||
* @param trigger The object to call periodically.
|
||||
* @throws java.io.InterruptedIOException If the waiting is interrupted
|
||||
*/
|
||||
void waitForMaximumCurrentTasks(long max, long id,
|
||||
int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException;
|
||||
|
||||
/**
|
||||
* Wait until there is at least one slot for a new task.
|
||||
* @param id the caller's id
|
||||
* @param periodToTrigger The period to invoke the trigger. This value is a
|
||||
* hint. The real period depends on the implementation.
|
||||
* @param trigger The object to call periodically.
|
||||
* @throws java.io.InterruptedIOException If the waiting is interrupted
|
||||
*/
|
||||
void waitForFreeSlot(long id, int periodToTrigger,
|
||||
Consumer<Long> trigger) throws InterruptedIOException;
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* A factory class that constructs an {@link org.apache.hadoop.hbase.client.RequestController}.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public final class RequestControllerFactory {
|
||||
public static final String REQUEST_CONTROLLER_IMPL_CONF_KEY = "hbase.client.request.controller.impl";
|
||||
/**
|
||||
* Constructs a {@link org.apache.hadoop.hbase.client.RequestController}.
|
||||
* @param conf The {@link Configuration} to use.
|
||||
* @return A RequestController which is built according to the configuration.
|
||||
*/
|
||||
public static RequestController create(Configuration conf) {
|
||||
Class<? extends RequestController> clazz= conf.getClass(REQUEST_CONTROLLER_IMPL_CONF_KEY,
|
||||
SimpleRequestController.class, RequestController.class);
|
||||
return ReflectionUtils.newInstance(clazz, conf);
|
||||
}
|
||||
}
|
|
@ -30,8 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@VisibleForTesting
|
||||
interface RowAccess<T> extends Iterable<T> {
|
||||
public interface RowAccess<T> extends Iterable<T> {
|
||||
/**
|
||||
* @return true if there are no elements.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,519 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* Holds back the request if the submitted size or number has reached the
|
||||
* threshold.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
class SimpleRequestController implements RequestController {
|
||||
private static final Log LOG = LogFactory.getLog(SimpleRequestController.class);
|
||||
/**
|
||||
* The maximum size of single RegionServer.
|
||||
*/
|
||||
public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
|
||||
|
||||
/**
|
||||
* Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
|
||||
|
||||
/**
|
||||
* The maximum size of submit.
|
||||
*/
|
||||
public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
|
||||
/**
|
||||
* Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
|
||||
@VisibleForTesting
|
||||
final AtomicLong tasksInProgress = new AtomicLong(0);
|
||||
@VisibleForTesting
|
||||
final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion
|
||||
= new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
|
||||
@VisibleForTesting
|
||||
final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
|
||||
/**
|
||||
* The number of tasks simultaneously executed on the cluster.
|
||||
*/
|
||||
private final int maxTotalConcurrentTasks;
|
||||
|
||||
/**
|
||||
* The max heap size of all tasks simultaneously executed on a server.
|
||||
*/
|
||||
private final long maxHeapSizePerRequest;
|
||||
private final long maxHeapSizeSubmit;
|
||||
/**
|
||||
* The number of tasks we run in parallel on a single region. With 1 (the
|
||||
* default) , we ensure that the ordering of the queries is respected: we
|
||||
* don't start a set of operations on a region before the previous one is
|
||||
* done. As well, this limits the pressure we put on the region server.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
final int maxConcurrentTasksPerRegion;
|
||||
|
||||
/**
|
||||
* The number of task simultaneously executed on a single region server.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
final int maxConcurrentTasksPerServer;
|
||||
private final int thresholdToLogUndoneTaskDetails;
|
||||
public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
|
||||
"hbase.client.threshold.log.details";
|
||||
private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
|
||||
public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
|
||||
"hbase.client.threshold.log.region.details";
|
||||
private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
|
||||
private final int thresholdToLogRegionDetails;
|
||||
SimpleRequestController(final Configuration conf) {
|
||||
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
|
||||
this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
|
||||
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
|
||||
this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
|
||||
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
|
||||
this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
|
||||
this.thresholdToLogUndoneTaskDetails =
|
||||
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
|
||||
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
|
||||
this.thresholdToLogRegionDetails =
|
||||
conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS,
|
||||
DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
|
||||
if (this.maxTotalConcurrentTasks <= 0) {
|
||||
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
||||
}
|
||||
if (this.maxConcurrentTasksPerServer <= 0) {
|
||||
throw new IllegalArgumentException("maxConcurrentTasksPerServer="
|
||||
+ maxConcurrentTasksPerServer);
|
||||
}
|
||||
if (this.maxConcurrentTasksPerRegion <= 0) {
|
||||
throw new IllegalArgumentException("maxConcurrentTasksPerRegion="
|
||||
+ maxConcurrentTasksPerRegion);
|
||||
}
|
||||
if (this.maxHeapSizePerRequest <= 0) {
|
||||
throw new IllegalArgumentException("maxHeapSizePerServer="
|
||||
+ maxHeapSizePerRequest);
|
||||
}
|
||||
|
||||
if (this.maxHeapSizeSubmit <= 0) {
|
||||
throw new IllegalArgumentException("maxHeapSizeSubmit="
|
||||
+ maxHeapSizeSubmit);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Checker newChecker(List<RowChecker> checkers) {
|
||||
return new Checker() {
|
||||
private boolean isEnd = false;
|
||||
|
||||
@Override
|
||||
public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
|
||||
if (isEnd) {
|
||||
return ReturnCode.END;
|
||||
}
|
||||
long rowSize = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
|
||||
ReturnCode code = ReturnCode.INCLUDE;
|
||||
for (RowChecker checker : checkers) {
|
||||
switch (checker.canTakeOperation(loc, rowSize)) {
|
||||
case END:
|
||||
isEnd = true;
|
||||
code = ReturnCode.END;
|
||||
break;
|
||||
case SKIP:
|
||||
code = ReturnCode.SKIP;
|
||||
break;
|
||||
case INCLUDE:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (code == ReturnCode.END) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (RowChecker checker : checkers) {
|
||||
checker.notifyFinal(code, loc, rowSize);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws InterruptedIOException {
|
||||
isEnd = false;
|
||||
InterruptedIOException e = null;
|
||||
for (RowChecker checker : checkers) {
|
||||
try {
|
||||
checker.reset();
|
||||
} catch (InterruptedIOException ex) {
|
||||
e = ex;
|
||||
}
|
||||
}
|
||||
if (e != null) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Checker newChecker() {
|
||||
List<RowChecker> checkers = new ArrayList<>(3);
|
||||
checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
|
||||
maxConcurrentTasksPerServer,
|
||||
maxConcurrentTasksPerRegion,
|
||||
tasksInProgress,
|
||||
taskCounterPerServer,
|
||||
taskCounterPerRegion));
|
||||
checkers.add(new RequestSizeChecker(maxHeapSizePerRequest));
|
||||
checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
|
||||
return newChecker(checkers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||
tasksInProgress.incrementAndGet();
|
||||
|
||||
computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
|
||||
|
||||
regions.forEach((regBytes)
|
||||
-> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||
regions.forEach(regBytes -> {
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
|
||||
regionCnt.decrementAndGet();
|
||||
});
|
||||
|
||||
taskCounterPerServer.get(sn).decrementAndGet();
|
||||
tasksInProgress.decrementAndGet();
|
||||
synchronized (tasksInProgress) {
|
||||
tasksInProgress.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumberOfTsksInProgress() {
|
||||
return tasksInProgress.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForMaximumCurrentTasks(long max, long id,
|
||||
int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
|
||||
assert max >= 0;
|
||||
long lastLog = EnvironmentEdgeManager.currentTime();
|
||||
long currentInProgress, oldInProgress = Long.MAX_VALUE;
|
||||
while ((currentInProgress = tasksInProgress.get()) > max) {
|
||||
if (oldInProgress != currentInProgress) { // Wait for in progress to change.
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
if (now > lastLog + periodToTrigger) {
|
||||
lastLog = now;
|
||||
if (trigger != null) {
|
||||
trigger.accept(currentInProgress);
|
||||
}
|
||||
logDetailsOfUndoneTasks(currentInProgress);
|
||||
}
|
||||
}
|
||||
oldInProgress = currentInProgress;
|
||||
try {
|
||||
synchronized (tasksInProgress) {
|
||||
if (tasksInProgress.get() == oldInProgress) {
|
||||
tasksInProgress.wait(10);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("#" + id + ", interrupted." +
|
||||
" currentNumberOfTask=" + currentInProgress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void logDetailsOfUndoneTasks(long taskInProgress) {
|
||||
if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
|
||||
ArrayList<ServerName> servers = new ArrayList<>();
|
||||
for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
|
||||
if (entry.getValue().get() > 0) {
|
||||
servers.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
|
||||
}
|
||||
|
||||
if (taskInProgress <= thresholdToLogRegionDetails) {
|
||||
ArrayList<String> regions = new ArrayList<>();
|
||||
for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
|
||||
if (entry.getValue().get() > 0) {
|
||||
regions.add(Bytes.toString(entry.getKey()));
|
||||
}
|
||||
}
|
||||
LOG.info("Regions against which left over task(s) are processed: " + regions);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
|
||||
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
|
||||
}
|
||||
|
||||
/**
|
||||
* limit the heapsize of total submitted data. Reduce the limit of heapsize
|
||||
* for submitting quickly if there is no running task.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class SubmittedSizeChecker implements RowChecker {
|
||||
|
||||
private final long maxHeapSizeSubmit;
|
||||
private long heapSize = 0;
|
||||
|
||||
SubmittedSizeChecker(final long maxHeapSizeSubmit) {
|
||||
this.maxHeapSizeSubmit = maxHeapSizeSubmit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
|
||||
if (heapSize >= maxHeapSizeSubmit) {
|
||||
return ReturnCode.END;
|
||||
}
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
|
||||
if (code == ReturnCode.INCLUDE) {
|
||||
heapSize += rowSize;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
heapSize = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* limit the max number of tasks in an AsyncProcess.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class TaskCountChecker implements RowChecker {
|
||||
|
||||
private static final long MAX_WAITING_TIME = 1000; //ms
|
||||
private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
|
||||
private final Set<ServerName> serversIncluded = new HashSet<>();
|
||||
private final int maxConcurrentTasksPerRegion;
|
||||
private final int maxTotalConcurrentTasks;
|
||||
private final int maxConcurrentTasksPerServer;
|
||||
private final Map<byte[], AtomicInteger> taskCounterPerRegion;
|
||||
private final Map<ServerName, AtomicInteger> taskCounterPerServer;
|
||||
private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||
private final AtomicLong tasksInProgress;
|
||||
|
||||
TaskCountChecker(final int maxTotalConcurrentTasks,
|
||||
final int maxConcurrentTasksPerServer,
|
||||
final int maxConcurrentTasksPerRegion,
|
||||
final AtomicLong tasksInProgress,
|
||||
final Map<ServerName, AtomicInteger> taskCounterPerServer,
|
||||
final Map<byte[], AtomicInteger> taskCounterPerRegion) {
|
||||
this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
|
||||
this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
|
||||
this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
|
||||
this.taskCounterPerRegion = taskCounterPerRegion;
|
||||
this.taskCounterPerServer = taskCounterPerServer;
|
||||
this.tasksInProgress = tasksInProgress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws InterruptedIOException {
|
||||
// prevent the busy-waiting
|
||||
waitForRegion();
|
||||
regionsIncluded.clear();
|
||||
serversIncluded.clear();
|
||||
busyRegions.clear();
|
||||
}
|
||||
|
||||
private void waitForRegion() throws InterruptedIOException {
|
||||
if (busyRegions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
final long start = ee.currentTime();
|
||||
while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
|
||||
for (byte[] region : busyRegions) {
|
||||
AtomicInteger count = taskCounterPerRegion.get(region);
|
||||
if (count == null || count.get() < maxConcurrentTasksPerRegion) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
synchronized (tasksInProgress) {
|
||||
tasksInProgress.wait(10);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted."
|
||||
+ " tasksInProgress=" + tasksInProgress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 1) check the regions is allowed. 2) check the concurrent tasks for
|
||||
* regions. 3) check the total concurrent tasks. 4) check the concurrent
|
||||
* tasks for server.
|
||||
*
|
||||
* @param loc
|
||||
* @param rowSize
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
|
||||
|
||||
HRegionInfo regionInfo = loc.getRegionInfo();
|
||||
if (regionsIncluded.contains(regionInfo)) {
|
||||
// We already know what to do with this region.
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
|
||||
if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
|
||||
// Too many tasks on this region already.
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
int newServers = serversIncluded.size()
|
||||
+ (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
|
||||
if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
|
||||
// Too many tasks.
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
|
||||
if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
|
||||
// Too many tasks for this individual server
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
|
||||
if (code == ReturnCode.INCLUDE) {
|
||||
regionsIncluded.add(loc.getRegionInfo());
|
||||
serversIncluded.add(loc.getServerName());
|
||||
}
|
||||
busyRegions.add(loc.getRegionInfo().getRegionName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* limit the request size for each regionserver.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class RequestSizeChecker implements RowChecker {
|
||||
|
||||
private final long maxHeapSizePerRequest;
|
||||
private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
|
||||
|
||||
RequestSizeChecker(final long maxHeapSizePerRequest) {
|
||||
this.maxHeapSizePerRequest = maxHeapSizePerRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
serverRequestSizes.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
|
||||
// Is it ok for limit of request size?
|
||||
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
|
||||
? serverRequestSizes.get(loc.getServerName()) : 0L;
|
||||
// accept at least one request
|
||||
if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
|
||||
if (code == ReturnCode.INCLUDE) {
|
||||
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
|
||||
? serverRequestSizes.get(loc.getServerName()) : 0L;
|
||||
serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a way to control the flow of rows iteration.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
interface RowChecker {
|
||||
|
||||
ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
|
||||
|
||||
/**
|
||||
* Add the final ReturnCode to the checker. The ReturnCode may be reversed,
|
||||
* so the checker need the final decision to update the inner state.
|
||||
*
|
||||
* @param code The final decision
|
||||
* @param loc the destination of data
|
||||
* @param rowSize the data size
|
||||
*/
|
||||
void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
|
||||
|
||||
/**
|
||||
* Reset the inner state.
|
||||
*/
|
||||
void reset() throws InterruptedIOException;
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,336 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ClientTests.class, SmallTests.class})
|
||||
public class TestSimpleRequestController {
|
||||
|
||||
private static final TableName DUMMY_TABLE
|
||||
= TableName.valueOf("DUMMY_TABLE");
|
||||
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
|
||||
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
|
||||
private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
|
||||
private static final ServerName SN = ServerName.valueOf("s1:1,1");
|
||||
private static final ServerName SN2 = ServerName.valueOf("s2:2,2");
|
||||
private static final ServerName SN3 = ServerName.valueOf("s3:3,3");
|
||||
private static final HRegionInfo HRI1
|
||||
= new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
|
||||
private static final HRegionInfo HRI2
|
||||
= new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
|
||||
private static final HRegionInfo HRI3
|
||||
= new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
|
||||
private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
|
||||
private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
|
||||
private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
|
||||
|
||||
@Test
|
||||
public void testIllegalRequestSize() {
|
||||
testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIllegalRsTasks() {
|
||||
testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIllegalRegionTasks() {
|
||||
testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIllegalSubmittedSize() {
|
||||
testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
|
||||
}
|
||||
|
||||
private void testIllegalArgument(String key, long value) {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
|
||||
try {
|
||||
SimpleRequestController controller = new SimpleRequestController(conf);
|
||||
fail("The " + key + " must be bigger than zero");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
}
|
||||
|
||||
private static Put createPut(long maxHeapSizePerRequest) {
|
||||
return new Put(Bytes.toBytes("row")) {
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return maxHeapSizePerRequest;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskCheckerHost() throws IOException {
|
||||
final int maxTotalConcurrentTasks = 100;
|
||||
final int maxConcurrentTasksPerServer = 2;
|
||||
final int maxConcurrentTasksPerRegion = 1;
|
||||
final AtomicLong tasksInProgress = new AtomicLong(0);
|
||||
final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
|
||||
final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
|
||||
SimpleRequestController.TaskCountChecker countChecker = new SimpleRequestController.TaskCountChecker(
|
||||
maxTotalConcurrentTasks,
|
||||
maxConcurrentTasksPerServer,
|
||||
maxConcurrentTasksPerRegion,
|
||||
tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
|
||||
final long maxHeapSizePerRequest = 2 * 1024 * 1024;
|
||||
// unlimiited
|
||||
SimpleRequestController.RequestSizeChecker sizeChecker = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
|
||||
RequestController.Checker checker = SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
|
||||
ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
|
||||
assertEquals(ReturnCode.INCLUDE, loc1Code);
|
||||
|
||||
ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
|
||||
// rejected for size
|
||||
assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
|
||||
|
||||
ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
|
||||
// rejected for size
|
||||
assertNotEquals(ReturnCode.INCLUDE, loc2Code);
|
||||
|
||||
// fill the task slots for LOC3.
|
||||
taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
|
||||
taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
|
||||
|
||||
ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
|
||||
// rejected for count
|
||||
assertNotEquals(ReturnCode.INCLUDE, loc3Code);
|
||||
|
||||
// release the task slots for LOC3.
|
||||
taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
|
||||
taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
|
||||
|
||||
ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
|
||||
assertEquals(ReturnCode.INCLUDE, loc3Code_2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestSizeCheckerr() throws IOException {
|
||||
final long maxHeapSizePerRequest = 2 * 1024 * 1024;
|
||||
SimpleRequestController.RequestSizeChecker checker
|
||||
= new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
|
||||
|
||||
// inner state is unchanged.
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
|
||||
assertEquals(ReturnCode.INCLUDE, code);
|
||||
code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
|
||||
assertEquals(ReturnCode.INCLUDE, code);
|
||||
}
|
||||
|
||||
// accept the data located on LOC1 region.
|
||||
ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
|
||||
assertEquals(ReturnCode.INCLUDE, acceptCode);
|
||||
checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
|
||||
|
||||
// the sn server reachs the limit.
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
|
||||
assertNotEquals(ReturnCode.INCLUDE, code);
|
||||
code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
|
||||
assertNotEquals(ReturnCode.INCLUDE, code);
|
||||
}
|
||||
|
||||
// the request to sn2 server should be accepted.
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
|
||||
assertEquals(ReturnCode.INCLUDE, code);
|
||||
}
|
||||
|
||||
checker.reset();
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
|
||||
assertEquals(ReturnCode.INCLUDE, code);
|
||||
code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
|
||||
assertEquals(ReturnCode.INCLUDE, code);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubmittedSizeChecker() {
|
||||
final long maxHeapSizeSubmit = 2 * 1024 * 1024;
|
||||
SimpleRequestController.SubmittedSizeChecker checker
|
||||
= new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
|
||||
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode include = checker.canTakeOperation(LOC1, 100000);
|
||||
assertEquals(ReturnCode.INCLUDE, include);
|
||||
}
|
||||
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
|
||||
}
|
||||
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode include = checker.canTakeOperation(LOC1, 100000);
|
||||
assertEquals(ReturnCode.END, include);
|
||||
}
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode include = checker.canTakeOperation(LOC2, 100000);
|
||||
assertEquals(ReturnCode.END, include);
|
||||
}
|
||||
checker.reset();
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode include = checker.canTakeOperation(LOC1, 100000);
|
||||
assertEquals(ReturnCode.INCLUDE, include);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskCountChecker() throws InterruptedIOException {
|
||||
long rowSize = 12345;
|
||||
int maxTotalConcurrentTasks = 100;
|
||||
int maxConcurrentTasksPerServer = 2;
|
||||
int maxConcurrentTasksPerRegion = 1;
|
||||
AtomicLong tasksInProgress = new AtomicLong(0);
|
||||
Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
|
||||
Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
|
||||
SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
|
||||
maxTotalConcurrentTasks,
|
||||
maxConcurrentTasksPerServer,
|
||||
maxConcurrentTasksPerRegion,
|
||||
tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
|
||||
|
||||
// inner state is unchanged.
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
|
||||
assertEquals(ReturnCode.INCLUDE, code);
|
||||
}
|
||||
// add LOC1 region.
|
||||
ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
|
||||
assertEquals(ReturnCode.INCLUDE, code);
|
||||
checker.notifyFinal(code, LOC1, rowSize);
|
||||
|
||||
// fill the task slots for LOC1.
|
||||
taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100));
|
||||
taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
|
||||
|
||||
// the region was previously accepted, so it must be accpted now.
|
||||
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
|
||||
ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
|
||||
assertEquals(ReturnCode.INCLUDE, includeCode);
|
||||
checker.notifyFinal(includeCode, LOC1, rowSize);
|
||||
}
|
||||
|
||||
// fill the task slots for LOC3.
|
||||
taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
|
||||
taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
|
||||
|
||||
// no task slots.
|
||||
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
|
||||
ReturnCode excludeCode = checker.canTakeOperation(LOC3, rowSize);
|
||||
assertNotEquals(ReturnCode.INCLUDE, excludeCode);
|
||||
checker.notifyFinal(excludeCode, LOC3, rowSize);
|
||||
}
|
||||
|
||||
// release the tasks for LOC3.
|
||||
taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
|
||||
taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
|
||||
|
||||
// add LOC3 region.
|
||||
ReturnCode code3 = checker.canTakeOperation(LOC3, rowSize);
|
||||
assertEquals(ReturnCode.INCLUDE, code3);
|
||||
checker.notifyFinal(code3, LOC3, rowSize);
|
||||
|
||||
// the region was previously accepted, so it must be accpted now.
|
||||
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
|
||||
ReturnCode includeCode = checker.canTakeOperation(LOC3, rowSize);
|
||||
assertEquals(ReturnCode.INCLUDE, includeCode);
|
||||
checker.notifyFinal(includeCode, LOC3, rowSize);
|
||||
}
|
||||
|
||||
checker.reset();
|
||||
// the region was previously accepted,
|
||||
// but checker have reseted and task slots for LOC1 is full.
|
||||
// So it must be rejected now.
|
||||
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
|
||||
ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
|
||||
assertNotEquals(ReturnCode.INCLUDE, includeCode);
|
||||
checker.notifyFinal(includeCode, LOC1, rowSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWaitForMaximumCurrentTasks() throws Exception {
|
||||
final AtomicInteger max = new AtomicInteger(0);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
|
||||
final AtomicLong tasks = controller.tasksInProgress;
|
||||
Runnable runnable = () -> {
|
||||
try {
|
||||
barrier.await();
|
||||
controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
|
||||
} catch (InterruptedIOException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
} catch (BrokenBarrierException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
// First test that our runnable thread only exits when tasks is zero.
|
||||
Thread t = new Thread(runnable);
|
||||
t.start();
|
||||
barrier.await();
|
||||
t.join();
|
||||
// Now assert we stay running if max == zero and tasks is > 0.
|
||||
barrier.reset();
|
||||
tasks.set(1000000);
|
||||
t = new Thread(runnable);
|
||||
t.start();
|
||||
barrier.await();
|
||||
while (tasks.get() > 0) {
|
||||
assertTrue(t.isAlive());
|
||||
tasks.set(tasks.get() - 1);
|
||||
}
|
||||
t.join();
|
||||
}
|
||||
}
|
|
@ -126,11 +126,8 @@ public class HConnectionTestingUtility {
|
|||
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
||||
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
||||
Mockito.when(c.getAsyncProcess()).thenReturn(
|
||||
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
|
||||
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt(
|
||||
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)));
|
||||
new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false,
|
||||
RpcControllerFactory.instantiate(conf)));
|
||||
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
|
||||
RpcRetryingCallerFactory.instantiate(conf,
|
||||
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.AsyncProcessTask;
|
||||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
|
||||
|
@ -137,14 +138,20 @@ public class TestClientPushback {
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicLong endTime = new AtomicLong();
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback<Result>() {
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, Result result) {
|
||||
BufferedMutatorImpl mutator = ((HTable) table).mutator;
|
||||
Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
|
||||
endTime.set(EnvironmentEdgeManager.currentTime());
|
||||
latch.countDown();
|
||||
}
|
||||
}, true);
|
||||
};
|
||||
AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
|
||||
.setPool(mutator.getPool())
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(ops)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
|
||||
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
|
||||
.setRpcTimeout(60 * 1000)
|
||||
.build();
|
||||
mutator.getAsyncProcess().submit(task);
|
||||
// Currently the ExponentialClientBackoffPolicy under these test conditions
|
||||
// produces a backoffTime of 151 milliseconds. This is long enough so the
|
||||
// wait and related checks below are reasonable. Revisit if the backoff
|
||||
|
|
|
@ -563,9 +563,17 @@ public class TestReplicasClient {
|
|||
gets.add(g);
|
||||
Object[] results = new Object[2];
|
||||
|
||||
AsyncRequestFuture reqs = ap.submitAll(
|
||||
HTable.getDefaultExecutor(HTU.getConfiguration()),
|
||||
table.getName(), gets, null, results);
|
||||
int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
|
||||
int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
|
||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
||||
.setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
|
||||
.setTableName(table.getName())
|
||||
.setRowAccess(gets)
|
||||
.setResults(results)
|
||||
.setOperationTimeout(operationTimeout)
|
||||
.setRpcTimeout(readTimeout)
|
||||
.build();
|
||||
AsyncRequestFuture reqs = ap.submit(task);
|
||||
reqs.waitUntilDone();
|
||||
// verify we got the right results back
|
||||
for (Object r : results) {
|
||||
|
|
|
@ -501,7 +501,6 @@ public class TestPerColumnFamilyFlush {
|
|||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
table.close();
|
||||
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
|
||||
assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound);
|
||||
assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound);
|
||||
|
|
|
@ -171,22 +171,35 @@ public class TestTablePermissions {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The AccessControlLists.addUserPermission may throw exception before closing the table.
|
||||
*/
|
||||
private void addUserPermission(Configuration conf, UserPermission userPerm, Table t) throws IOException {
|
||||
try {
|
||||
AccessControlLists.addUserPermission(conf, userPerm, t);
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicWrite() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||
Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||
// add some permissions
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("george"), TEST_TABLE, null, (byte[])null,
|
||||
UserPermission.Action.READ, UserPermission.Action.WRITE), table);
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
UserPermission.Action.READ, UserPermission.Action.WRITE),
|
||||
connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE, null, (byte[])null,
|
||||
UserPermission.Action.READ), table);
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
UserPermission.Action.READ),
|
||||
connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("humphrey"),
|
||||
TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
|
||||
UserPermission.Action.READ), table);
|
||||
UserPermission.Action.READ),
|
||||
connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
}
|
||||
// retrieve the same
|
||||
ListMultimap<String,TablePermission> perms =
|
||||
|
@ -274,23 +287,22 @@ public class TestTablePermissions {
|
|||
@Test
|
||||
public void testPersistence() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||
Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("albert"), TEST_TABLE, null,
|
||||
(byte[])null, TablePermission.Action.READ), table);
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
(byte[])null, TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("betty"), TEST_TABLE, null,
|
||||
(byte[])null, TablePermission.Action.READ,
|
||||
TablePermission.Action.WRITE), table);
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("clark"),
|
||||
TEST_TABLE, TEST_FAMILY,
|
||||
TablePermission.Action.READ), table);
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("dwight"),
|
||||
TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
|
||||
TablePermission.Action.WRITE), table);
|
||||
TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
}
|
||||
// verify permissions survive changes in table metadata
|
||||
ListMultimap<String,TablePermission> preperms =
|
||||
|
@ -404,17 +416,17 @@ public class TestTablePermissions {
|
|||
Configuration conf = UTIL.getConfiguration();
|
||||
|
||||
// add some permissions
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||
Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("user1"),
|
||||
Permission.Action.READ, Permission.Action.WRITE), table);
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
Permission.Action.READ, Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("user2"),
|
||||
Permission.Action.CREATE), table);
|
||||
AccessControlLists.addUserPermission(conf,
|
||||
Permission.Action.CREATE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
addUserPermission(conf,
|
||||
new UserPermission(Bytes.toBytes("user3"),
|
||||
Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE), table);
|
||||
Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE),
|
||||
connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
}
|
||||
ListMultimap<String,TablePermission> perms = AccessControlLists.getTablePermissions(conf, null);
|
||||
List<TablePermission> user1Perms = perms.get("user1");
|
||||
|
@ -448,11 +460,11 @@ public class TestTablePermissions {
|
|||
// currently running user is the system user and should have global admin perms
|
||||
User currentUser = User.getCurrent();
|
||||
assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN));
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||
Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||
for (int i=1; i<=50; i++) {
|
||||
AccessControlLists.addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
|
||||
Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), table);
|
||||
addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
|
||||
Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE),
|
||||
connection.getTable(AccessControlLists.ACL_TABLE_NAME));
|
||||
// make sure the system user still shows as authorized
|
||||
assertTrue("Failed current user auth check on iter "+i,
|
||||
authManager.authorize(currentUser, Permission.Action.ADMIN));
|
||||
|
|
Loading…
Reference in New Issue