HBASE-16224 Reduce the number of RPCs for the large PUTs (ChiaPing Tsai)

This commit is contained in:
chenheng 2016-08-30 06:35:33 +08:00
parent de5a3a006a
commit c000f29e47
5 changed files with 997 additions and 158 deletions

View File

@ -19,17 +19,13 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import com.google.common.annotations.VisibleForTesting;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -38,31 +34,39 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This class allows a continuous flow of requests. It's written to be compatible with a * This class allows a continuous flow of requests. It's written to be compatible with a
* synchronous caller such as HTable. * synchronous caller such as HTable.
@ -126,6 +130,25 @@ class AsyncProcess {
private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2; private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
/**
* The maximum size of single RegionServer.
*/
public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
/**
* Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
*/
public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
/**
* The maximum size of submit.
*/
public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
/**
* Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
*/
public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
/** /**
* The context used to wait for results from one submit call. * The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@ -208,7 +231,6 @@ class AsyncProcess {
new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
new ConcurrentHashMap<ServerName, AtomicInteger>(); new ConcurrentHashMap<ServerName, AtomicInteger>();
// Start configuration settings. // Start configuration settings.
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
@ -217,6 +239,11 @@ class AsyncProcess {
*/ */
protected final int maxTotalConcurrentTasks; protected final int maxTotalConcurrentTasks;
/**
* The max heap size of all tasks simultaneously executed on a server.
*/
protected final long maxHeapSizePerRequest;
protected final long maxHeapSizeSubmit;
/** /**
* The number of tasks we run in parallel on a single region. * The number of tasks we run in parallel on a single region.
* With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
@ -278,7 +305,6 @@ class AsyncProcess {
addresses.addAll(other.addresses); addresses.addAll(other.addresses);
} }
} }
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
RpcControllerFactory rpcFactory, int rpcTimeout) { RpcControllerFactory rpcFactory, int rpcTimeout) {
@ -306,7 +332,9 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
this.startLogErrorsCnt = this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
@ -321,7 +349,15 @@ class AsyncProcess {
throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
maxConcurrentTasksPerRegion); maxConcurrentTasksPerRegion);
} }
if (this.maxHeapSizePerRequest <= 0) {
throw new IllegalArgumentException("maxHeapSizePerServer=" +
maxHeapSizePerRequest);
}
if (this.maxHeapSizeSubmit <= 0) {
throw new IllegalArgumentException("maxHeapSizeSubmit=" +
maxHeapSizeSubmit);
}
// Server tracker allows us to do faster, and yet useful (hopefully), retries. // Server tracker allows us to do faster, and yet useful (hopefully), retries.
// However, if we are too useful, we might fail very quickly due to retry count limit. // However, if we are too useful, we might fail very quickly due to retry count limit.
// To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
@ -356,16 +392,34 @@ class AsyncProcess {
} }
throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
} }
/** /**
* See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}.
* Uses default ExecutorService for this AP (must have been created with one). * Uses default ExecutorService for this AP (must have been created with one).
*/ */
public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? extends Row> rows,
boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
throws InterruptedIOException { throws InterruptedIOException {
return submit(null, tableName, rows, atLeastOne, callback, needResults); return submit(null, tableName, rows, atLeastOne, callback, needResults);
} }
/**
* See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
* Uses default ExecutorService for this AP (must have been created with one).
*/
public <CResult> AsyncRequestFuture submit(TableName tableName,
final RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
boolean needResults) throws InterruptedIOException {
return submit(null, tableName, rows, atLeastOne, callback, needResults);
}
/**
* See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
* Uses the {@link ListRowAccess} to wrap the {@link List}.
*/
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
boolean needResults) throws InterruptedIOException {
return submit(pool, tableName, new ListRowAccess(rows), atLeastOne,
callback, needResults);
}
/** /**
* Extract from the rows list what we can submit. The rows we can not submit are kept in the * Extract from the rows list what we can submit. The rows we can not submit are kept in the
@ -380,7 +434,7 @@ class AsyncProcess {
* @param atLeastOne true if we should submit at least a subset. * @param atLeastOne true if we should submit at least a subset.
*/ */
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
boolean needResults) throws InterruptedIOException { boolean needResults) throws InterruptedIOException {
if (rows.isEmpty()) { if (rows.isEmpty()) {
return NO_REQS_RESULT; return NO_REQS_RESULT;
@ -396,16 +450,15 @@ class AsyncProcess {
// Location errors that happen before we decide what requests to take. // Location errors that happen before we decide what requests to take.
List<Exception> locationErrors = null; List<Exception> locationErrors = null;
List<Integer> locationErrorRows = null; List<Integer> locationErrorRows = null;
RowCheckerHost checker = createRowCheckerHost();
boolean firstIter = true;
do { do {
// Wait until there is at least one slot for a new task. // Wait until there is at least one slot for a new task.
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
// Remember the previous decisions about regions or region servers we put in the
// final multi.
Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
int posInList = -1; int posInList = -1;
if (!firstIter) {
checker.reset();
}
Iterator<? extends Row> it = rows.iterator(); Iterator<? extends Row> it = rows.iterator();
while (it.hasNext()) { while (it.hasNext()) {
Row r = it.next(); Row r = it.next();
@ -434,8 +487,12 @@ class AsyncProcess {
it.remove(); it.remove();
break; // Backward compat: we stop considering actions on location error. break; // Backward compat: we stop considering actions on location error.
} }
long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0;
if (canTakeOperation(loc, regionIncluded, serverIncluded)) { ReturnCode code = checker.canTakeOperation(loc, rowSize);
if (code == ReturnCode.END) {
break;
}
if (code == ReturnCode.INCLUDE) {
Action<Row> action = new Action<Row>(r, ++posInList); Action<Row> action = new Action<Row>(r, ++posInList);
setNonce(ng, r, action); setNonce(ng, r, action);
retainedActions.add(action); retainedActions.add(action);
@ -445,6 +502,7 @@ class AsyncProcess {
it.remove(); it.remove();
} }
} }
firstIter = false;
} while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
if (retainedActions.isEmpty()) return NO_REQS_RESULT; if (retainedActions.isEmpty()) return NO_REQS_RESULT;
@ -453,6 +511,18 @@ class AsyncProcess {
locationErrors, locationErrorRows, actionsByServer, pool); locationErrors, locationErrorRows, actionsByServer, pool);
} }
private RowCheckerHost createRowCheckerHost() {
return new RowCheckerHost(Arrays.asList(
new TaskCountChecker(maxTotalConcurrentTasks,
maxConcurrentTasksPerServer,
maxConcurrentTasksPerRegion,
tasksInProgress,
taskCounterPerServer,
taskCounterPerRegion)
, new RequestSizeChecker(maxHeapSizePerRequest)
, new SubmittedSizeChecker(maxHeapSizeSubmit)
));
}
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName, <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
Object[] results, boolean needResults, List<Exception> locationErrors, Object[] results, boolean needResults, List<Exception> locationErrors,
@ -494,74 +564,6 @@ class AsyncProcess {
multiAction.add(regionName, action); multiAction.add(regionName, action);
} }
/**
* Check if we should send new operations to this region or region server.
* We're taking into account the past decision; if we have already accepted
* operation on a given region, we accept all operations for this region.
*
* @param loc; the region and the server name we want to use.
* @return true if this region is considered as busy.
*/
protected boolean canTakeOperation(HRegionLocation loc,
Map<HRegionInfo, Boolean> regionsIncluded,
Map<ServerName, Boolean> serversIncluded) {
HRegionInfo regionInfo = loc.getRegionInfo();
Boolean regionPrevious = regionsIncluded.get(regionInfo);
if (regionPrevious != null) {
// We already know what to do with this region.
return regionPrevious;
}
Boolean serverPrevious = serversIncluded.get(loc.getServerName());
if (Boolean.FALSE.equals(serverPrevious)) {
// It's a new region, on a region server that we have already excluded.
regionsIncluded.put(regionInfo, Boolean.FALSE);
return false;
}
AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
// Too many tasks on this region already.
regionsIncluded.put(regionInfo, Boolean.FALSE);
return false;
}
if (serverPrevious == null) {
// The region is ok, but we need to decide for this region server.
int newServers = 0; // number of servers we're going to contact so far
for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
if (kv.getValue()) {
newServers++;
}
}
// Do we have too many total tasks already?
boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
if (ok) {
// If the total is fine, is it ok for this individual server?
AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
}
if (!ok) {
regionsIncluded.put(regionInfo, Boolean.FALSE);
serversIncluded.put(loc.getServerName(), Boolean.FALSE);
return false;
}
serversIncluded.put(loc.getServerName(), Boolean.TRUE);
} else {
assert serverPrevious.equals(Boolean.TRUE);
}
regionsIncluded.put(regionInfo, Boolean.TRUE);
return true;
}
/** /**
* See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}. * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}.
* Uses default ExecutorService for this AP (must have been created with one). * Uses default ExecutorService for this AP (must have been created with one).
@ -740,7 +742,7 @@ class AsyncProcess {
private final int numAttempt; private final int numAttempt;
private final ServerName server; private final ServerName server;
private final Set<CancellableRegionServerCallable> callsInProgress; private final Set<CancellableRegionServerCallable> callsInProgress;
private Long heapSize = null;
private SingleServerRequestRunnable( private SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server, MultiAction<Row> multiAction, int numAttempt, ServerName server,
Set<CancellableRegionServerCallable> callsInProgress) { Set<CancellableRegionServerCallable> callsInProgress) {
@ -750,6 +752,24 @@ class AsyncProcess {
this.callsInProgress = callsInProgress; this.callsInProgress = callsInProgress;
} }
@VisibleForTesting
long heapSize() {
if (heapSize != null) {
return heapSize;
}
heapSize = 0L;
for (Map.Entry<byte[], List<Action<Row>>> e: this.multiAction.actions.entrySet()) {
List<Action<Row>> actions = e.getValue();
for (Action<Row> action: actions) {
Row row = action.getAction();
if (row instanceof Mutation) {
heapSize += ((Mutation) row).heapSize();
}
}
}
return heapSize;
}
@Override @Override
public void run() { public void run() {
MultiResponse res; MultiResponse res;
@ -831,7 +851,7 @@ class AsyncProcess {
private final long nonceGroup; private final long nonceGroup;
private CancellableRegionServerCallable currentCallable; private CancellableRegionServerCallable currentCallable;
private int currentCallTotalTimeout; private int currentCallTotalTimeout;
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results, ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback<CResult> callback, CancellableRegionServerCallable callable, int timeout) { Batch.Callback<CResult> callback, CancellableRegionServerCallable callable, int timeout) {
@ -910,7 +930,21 @@ class AsyncProcess {
public Set<CancellableRegionServerCallable> getCallsInProgress() { public Set<CancellableRegionServerCallable> getCallsInProgress() {
return callsInProgress; return callsInProgress;
} }
@VisibleForTesting
Map<ServerName, List<Long>> getRequestHeapSize() {
return heapSizesByServer;
}
private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server,
SingleServerRequestRunnable runnable) {
List<Long> heapCount = heapSizesByServer.get(server);
if (heapCount == null) {
heapCount = new LinkedList<>();
heapSizesByServer.put(server, heapCount);
}
heapCount.add(runnable.heapSize());
return runnable;
}
/** /**
* Group a list of actions per region servers, and send them. * Group a list of actions per region servers, and send them.
* *
@ -1080,8 +1114,9 @@ class AsyncProcess {
if (connection.getConnectionMetrics() != null) { if (connection.getConnectionMetrics() != null) {
connection.getConnectionMetrics().incrNormalRunners(); connection.getConnectionMetrics().incrNormalRunners();
} }
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server,
new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
} }
// group the actions by the amount of delay // group the actions by the amount of delay
@ -1102,9 +1137,8 @@ class AsyncProcess {
List<Runnable> toReturn = new ArrayList<Runnable>(actions.size()); List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
for (DelayingRunner runner : actions.values()) { for (DelayingRunner runner : actions.values()) {
String traceText = "AsyncProcess.sendMultiAction"; String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable = Runnable runnable = addSingleServerRequestHeapSize(server,
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
callsInProgress);
// use a delay runner only if we need to sleep for some time // use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) { if (runner.getSleepTime() > 0) {
runner.setRunner(runnable); runner.setRunner(runnable);
@ -1941,4 +1975,284 @@ class AsyncProcess {
NO_RETRIES_EXHAUSTED, NO_RETRIES_EXHAUSTED,
NO_OTHER_SUCCEEDED NO_OTHER_SUCCEEDED
} }
/**
* Collect all advices from checkers and make the final decision.
*/
@VisibleForTesting
static class RowCheckerHost {
private final List<RowChecker> checkers;
private boolean isEnd = false;
RowCheckerHost(final List<RowChecker> checkers) {
this.checkers = checkers;
}
void reset() throws InterruptedIOException {
isEnd = false;
InterruptedIOException e = null;
for (RowChecker checker : checkers) {
try {
checker.reset();
} catch (InterruptedIOException ex) {
e = ex;
}
}
if (e != null) {
throw e;
}
}
ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
if (isEnd) {
return ReturnCode.END;
}
ReturnCode code = ReturnCode.INCLUDE;
for (RowChecker checker : checkers) {
switch (checker.canTakeOperation(loc, rowSize)) {
case END:
isEnd = true;
code = ReturnCode.END;
break;
case SKIP:
code = ReturnCode.SKIP;
break;
case INCLUDE:
default:
break;
}
if (code == ReturnCode.END) {
break;
}
}
for (RowChecker checker : checkers) {
checker.notifyFinal(code, loc, rowSize);
}
return code;
}
}
/**
* Provide a way to control the flow of rows iteration.
*/
@VisibleForTesting
interface RowChecker {
enum ReturnCode {
/**
* Accept current row.
*/
INCLUDE,
/**
* Skip current row.
*/
SKIP,
/**
* No more row can be included.
*/
END
};
ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
/**
* Add the final ReturnCode to the checker.
* The ReturnCode may be reversed, so the checker need the final decision to update
* the inner state.
*/
void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
/**
* Reset the inner state.
*/
void reset() throws InterruptedIOException ;
}
/**
* limit the heapsize of total submitted data.
* Reduce the limit of heapsize for submitting quickly
* if there is no running task.
*/
@VisibleForTesting
static class SubmittedSizeChecker implements RowChecker {
private final long maxHeapSizeSubmit;
private long heapSize = 0;
SubmittedSizeChecker(final long maxHeapSizeSubmit) {
this.maxHeapSizeSubmit = maxHeapSizeSubmit;
}
@Override
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
if (heapSize >= maxHeapSizeSubmit) {
return ReturnCode.END;
}
return ReturnCode.INCLUDE;
}
@Override
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
if (code == ReturnCode.INCLUDE) {
heapSize += rowSize;
}
}
@Override
public void reset() {
heapSize = 0;
}
}
/**
* limit the max number of tasks in an AsyncProcess.
*/
@VisibleForTesting
static class TaskCountChecker implements RowChecker {
private static final long MAX_WAITING_TIME = 1000; //ms
private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
private final Set<ServerName> serversIncluded = new HashSet<>();
private final int maxConcurrentTasksPerRegion;
private final int maxTotalConcurrentTasks;
private final int maxConcurrentTasksPerServer;
private final Map<byte[], AtomicInteger> taskCounterPerRegion;
private final Map<ServerName, AtomicInteger> taskCounterPerServer;
private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
private final AtomicLong tasksInProgress;
TaskCountChecker(final int maxTotalConcurrentTasks,
final int maxConcurrentTasksPerServer,
final int maxConcurrentTasksPerRegion,
final AtomicLong tasksInProgress,
final Map<ServerName, AtomicInteger> taskCounterPerServer,
final Map<byte[], AtomicInteger> taskCounterPerRegion) {
this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
this.taskCounterPerRegion = taskCounterPerRegion;
this.taskCounterPerServer = taskCounterPerServer;
this.tasksInProgress = tasksInProgress;
}
@Override
public void reset() throws InterruptedIOException {
// prevent the busy-waiting
waitForRegion();
regionsIncluded.clear();
serversIncluded.clear();
busyRegions.clear();
}
private void waitForRegion() throws InterruptedIOException {
if (busyRegions.isEmpty()) {
return;
}
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
final long start = ee.currentTime();
while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
for (byte[] region : busyRegions) {
AtomicInteger count = taskCounterPerRegion.get(region);
if (count == null || count.get() < maxConcurrentTasksPerRegion) {
return;
}
}
try {
synchronized (tasksInProgress) {
tasksInProgress.wait(10);
}
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted." +
" tasksInProgress=" + tasksInProgress);
}
}
}
/**
* 1) check the regions is allowed.
* 2) check the concurrent tasks for regions.
* 3) check the total concurrent tasks.
* 4) check the concurrent tasks for server.
* @param loc
* @param rowSize
* @return
*/
@Override
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
HRegionInfo regionInfo = loc.getRegionInfo();
if (regionsIncluded.contains(regionInfo)) {
// We already know what to do with this region.
return ReturnCode.INCLUDE;
}
AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
// Too many tasks on this region already.
return ReturnCode.SKIP;
}
int newServers = serversIncluded.size()
+ (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
// Too many tasks.
return ReturnCode.SKIP;
}
AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
// Too many tasks for this individual server
return ReturnCode.SKIP;
}
return ReturnCode.INCLUDE;
}
@Override
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
if (code == ReturnCode.INCLUDE) {
regionsIncluded.add(loc.getRegionInfo());
serversIncluded.add(loc.getServerName());
}
busyRegions.add(loc.getRegionInfo().getRegionName());
}
}
/**
* limit the request size for each regionserver.
*/
@VisibleForTesting
static class RequestSizeChecker implements RowChecker {
private final long maxHeapSizePerRequest;
private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
RequestSizeChecker(final long maxHeapSizePerRequest) {
this.maxHeapSizePerRequest = maxHeapSizePerRequest;
}
@Override
public void reset() {
serverRequestSizes.clear();
}
@Override
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
// Is it ok for limit of request size?
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
serverRequestSizes.get(loc.getServerName()) : 0L;
// accept at least one request
if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
return ReturnCode.INCLUDE;
}
return ReturnCode.SKIP;
}
@Override
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
if (code == ReturnCode.INCLUDE) {
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
serverRequestSizes.get(loc.getServerName()) : 0L;
serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
}
}
}
public static class ListRowAccess<T> implements RowAccess<T> {
private final List<T> data;
ListRowAccess(final List<T> data) {
this.data = data;
}
@Override
public int size() {
return data.size();
}
@Override
public boolean isEmpty() {
return data.isEmpty();
}
@Override
public Iterator<T> iterator() {
return data.iterator();
}
}
} }

View File

@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -69,6 +71,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
@VisibleForTesting @VisibleForTesting
AtomicLong currentWriteBufferSize = new AtomicLong(0); AtomicLong currentWriteBufferSize = new AtomicLong(0);
/**
* Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
* The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
*/
@VisibleForTesting
AtomicInteger undealtMutationCount = new AtomicInteger(0);
private long writeBufferSize; private long writeBufferSize;
private final int maxKeyValueSize; private final int maxKeyValueSize;
private boolean closed = false; private boolean closed = false;
@ -129,11 +137,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
} }
long toAddSize = 0; long toAddSize = 0;
int toAddCount = 0;
for (Mutation m : ms) { for (Mutation m : ms) {
if (m instanceof Put) { if (m instanceof Put) {
validatePut((Put) m); validatePut((Put) m);
} }
toAddSize += m.heapSize(); toAddSize += m.heapSize();
++toAddCount;
} }
// This behavior is highly non-intuitive... it does not protect us against // This behavior is highly non-intuitive... it does not protect us against
@ -142,14 +152,17 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (ap.hasError()) { if (ap.hasError()) {
currentWriteBufferSize.addAndGet(toAddSize); currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms); writeAsyncBuffer.addAll(ms);
undealtMutationCount.addAndGet(toAddCount);
backgroundFlushCommits(true); backgroundFlushCommits(true);
} else { } else {
currentWriteBufferSize.addAndGet(toAddSize); currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms); writeAsyncBuffer.addAll(ms);
undealtMutationCount.addAndGet(toAddCount);
} }
// Now try and queue what needs to be queued. // Now try and queue what needs to be queued.
while (currentWriteBufferSize.get() > writeBufferSize) { while (undealtMutationCount.get() != 0
&& currentWriteBufferSize.get() > writeBufferSize) {
backgroundFlushCommits(false); backgroundFlushCommits(false);
} }
} }
@ -208,42 +221,33 @@ public class BufferedMutatorImpl implements BufferedMutator {
private void backgroundFlushCommits(boolean synchronous) throws private void backgroundFlushCommits(boolean synchronous) throws
InterruptedIOException, InterruptedIOException,
RetriesExhaustedWithDetailsException { RetriesExhaustedWithDetailsException {
if (!synchronous && writeAsyncBuffer.isEmpty()) {
LinkedList<Mutation> buffer = new LinkedList<>();
// Keep track of the size so that this thread doesn't spin forever
long dequeuedSize = 0;
try {
// Grab all of the available mutations.
Mutation m;
// If there's no buffer size drain everything. If there is a buffersize drain up to twice
// that amount. This should keep the loop from continually spinning if there are threads
// that keep adding more data to the buffer.
while (
(writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous)
&& (m = writeAsyncBuffer.poll()) != null) {
buffer.add(m);
long size = m.heapSize();
dequeuedSize += size;
currentWriteBufferSize.addAndGet(-size);
}
if (!synchronous && dequeuedSize == 0) {
return; return;
} }
if (!synchronous) { if (!synchronous) {
ap.submit(tableName, buffer, true, null, false); QueueRowAccess taker = new QueueRowAccess();
try {
ap.submit(tableName, taker, true, null, false);
if (ap.hasError()) { if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -" LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)"); + " waiting for all operation in progress to finish (successfully or not)");
} }
} finally {
taker.restoreRemainder();
}
} }
if (synchronous || ap.hasError()) { if (synchronous || ap.hasError()) {
while (!buffer.isEmpty()) { QueueRowAccess taker = new QueueRowAccess();
ap.submit(tableName, buffer, true, null, false); try {
while (!taker.isEmpty()) {
ap.submit(tableName, taker, true, null, false);
taker.reset();
} }
} finally {
taker.restoreRemainder();
}
RetriesExhaustedWithDetailsException error = RetriesExhaustedWithDetailsException error =
ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
if (error != null) { if (error != null) {
@ -254,14 +258,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
} }
} }
} }
} finally {
for (Mutation mut : buffer) {
long size = mut.heapSize();
currentWriteBufferSize.addAndGet(size);
dequeuedSize -= size;
writeAsyncBuffer.add(mut);
}
}
} }
/** /**
@ -285,4 +281,67 @@ public class BufferedMutatorImpl implements BufferedMutator {
public long getWriteBufferSize() { public long getWriteBufferSize() {
return this.writeBufferSize; return this.writeBufferSize;
} }
private class QueueRowAccess implements RowAccess<Row> {
private int remainder = undealtMutationCount.getAndSet(0);
void reset() {
restoreRemainder();
remainder = undealtMutationCount.getAndSet(0);
}
@Override
public Iterator<Row> iterator() {
return new Iterator<Row>() {
private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
private int countDown = remainder;
private Mutation last = null;
@Override
public boolean hasNext() {
if (countDown <= 0) {
return false;
}
return iter.hasNext();
}
@Override
public Row next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
last = iter.next();
if (last == null) {
throw new NoSuchElementException();
}
--countDown;
return last;
}
@Override
public void remove() {
if (last == null) {
throw new IllegalStateException();
}
iter.remove();
currentWriteBufferSize.addAndGet(-last.heapSize());
--remainder;
}
};
}
@Override
public int size() {
return remainder;
}
void restoreRemainder() {
if (remainder > 0) {
undealtMutationCount.addAndGet(remainder);
remainder = 0;
}
}
@Override
public boolean isEmpty() {
return remainder <= 0;
}
}
} }

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Provide a way to access the inner buffer.
* The purpose is to reduce the elapsed time to move a large number
* of elements between collections.
* @param <T>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@VisibleForTesting
interface RowAccess<T> extends Iterable<T> {
/**
* @return true if there are no elements.
*/
boolean isEmpty();
/**
* @return the number of elements in this list.
*/
int size();
}

View File

@ -25,8 +25,11 @@ import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -57,6 +60,12 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess;
import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost;
import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -65,13 +74,34 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert; import org.junit.Assert;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker;
import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({ClientTests.class, MediumTests.class}) @Category({ClientTests.class, MediumTests.class})
public class TestAsyncProcess { public class TestAsyncProcess {
@ -180,6 +210,13 @@ public class TestAsyncProcess {
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
} }
@Override
public <Res> AsyncRequestFuture submit(TableName tableName, RowAccess<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults)
throws InterruptedIOException {
// We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
@Override @Override
public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults) boolean atLeastOne, Callback<Res> callback, boolean needResults)
@ -435,7 +472,186 @@ public class TestAsyncProcess {
return null; return null;
} }
} }
@Test
public void testListRowAccess() {
int count = 10;
List<String> values = new LinkedList<>();
for (int i = 0; i != count; ++i) {
values.add(String.valueOf(i));
}
ListRowAccess<String> taker = new ListRowAccess(values);
assertEquals(count, taker.size());
int restoreCount = 0;
int takeCount = 0;
Iterator<String> it = taker.iterator();
while (it.hasNext()) {
String v = it.next();
assertEquals(String.valueOf(takeCount), v);
++takeCount;
it.remove();
if (Math.random() >= 0.5) {
break;
}
}
assertEquals(count, taker.size() + takeCount);
it = taker.iterator();
while (it.hasNext()) {
String v = it.next();
assertEquals(String.valueOf(takeCount), v);
++takeCount;
it.remove();
}
assertEquals(0, taker.size());
assertEquals(count, takeCount);
}
private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) {
if (putSizePerServer <= maxHeapSizePerRequest) {
return 1;
} else if (putSizePerServer % maxHeapSizePerRequest == 0) {
return putSizePerServer / maxHeapSizePerRequest;
} else {
return putSizePerServer / maxHeapSizePerRequest + 1;
}
}
@Test
public void testSubmitSameSizeOfRequest() throws Exception {
long writeBuffer = 2 * 1024 * 1024;
long putsHeapSize = writeBuffer;
doSubmitRequest(writeBuffer, putsHeapSize);
}
@Test
public void testIllegalArgument() throws IOException {
ClusterConnection conn = createHConnection();
final long maxHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
try {
MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
fail("The maxHeapSizePerRequest must be bigger than zero");
} catch (IllegalArgumentException e) {
}
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
}
@Test
public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
long maxHeapSizePerRequest = Long.MAX_VALUE;
long putsHeapSize = 2 * 1024 * 1024;
doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
}
@Test(timeout=300000)
public void testSubmitRandomSizeRequest() throws Exception {
Random rn = new Random();
final long limit = 10 * 1024 * 1024;
for (int count = 0; count != 2; ++count) {
long maxHeapSizePerRequest = Math.max(1, (Math.abs(rn.nextLong()) % limit));
long putsHeapSize = Math.max(1, (Math.abs(rn.nextLong()) % limit));
LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest + ", putsHeapSize=" + putsHeapSize);
doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
}
}
@Test
public void testSubmitSmallRequest() throws Exception {
long maxHeapSizePerRequest = 2 * 1024 * 1024;
long putsHeapSize = 100;
doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
}
@Test(timeout=120000)
public void testSubmitLargeRequest() throws Exception {
long maxHeapSizePerRequest = 2 * 1024 * 1024;
long putsHeapSize = maxHeapSizePerRequest * 2;
doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
}
private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
ClusterConnection conn = createHConnection();
final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
// sn has two regions
long putSizeSN = 0;
long putSizeSN2 = 0;
List<Put> puts = new ArrayList<>();
while ((putSizeSN + putSizeSN2) <= putsHeapSize) {
Put put1 = new Put(DUMMY_BYTES_1);
put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
Put put2 = new Put(DUMMY_BYTES_2);
put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
Put put3 = new Put(DUMMY_BYTES_3);
put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
putSizeSN += (put1.heapSize() + put2.heapSize());
putSizeSN2 += put3.heapSize();
puts.add(put1);
puts.add(put2);
puts.add(put3);
}
int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest);
int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest);
LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN + ", putSizeSN2:" + putSizeSN2
+ ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
+ ", minCountSnRequest:" + minCountSnRequest
+ ", minCountSn2Request:" + minCountSn2Request);
try (HTable ht = new HTable(conn, bufferParam)) {
MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
ht.mutator.ap = ap;
Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
ht.put(puts);
List<AsyncRequestFuture> reqs = ap.allReqs;
int actualSnReqCount = 0;
int actualSn2ReqCount = 0;
for (AsyncRequestFuture req : reqs) {
if (!(req instanceof AsyncRequestFutureImpl)) {
continue;
}
AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
if (ars.getRequestHeapSize().containsKey(sn)) {
++actualSnReqCount;
}
if (ars.getRequestHeapSize().containsKey(sn2)) {
++actualSn2ReqCount;
}
}
// If the server is busy, the actual count may be incremented.
assertEquals(true, minCountSnRequest <= actualSnReqCount);
assertEquals(true, minCountSn2Request <= actualSn2ReqCount);
Map<ServerName, Long> sizePerServers = new HashMap<>();
for (AsyncRequestFuture req : reqs) {
if (!(req instanceof AsyncRequestFutureImpl)) {
continue;
}
AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
long sum = 0;
for (long size : entry.getValue()) {
assertEquals(true, size <= maxHeapSizePerRequest);
sum += size;
}
assertEquals(true, sum <= maxHeapSizePerRequest);
long value = sizePerServers.containsKey(entry.getKey()) ? sizePerServers.get(entry.getKey()) : 0L;
sizePerServers.put(entry.getKey(), value + sum);
}
}
assertEquals(true, sizePerServers.containsKey(sn));
assertEquals(true, sizePerServers.containsKey(sn2));
assertEquals(false, sizePerServers.containsKey(sn3));
assertEquals(putSizeSN, (long) sizePerServers.get(sn));
assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
}
// restore config.
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest);
}
@Test @Test
public void testSubmit() throws Exception { public void testSubmit() throws Exception {
ClusterConnection hc = createHConnection(); ClusterConnection hc = createHConnection();
@ -477,7 +693,9 @@ public class TestAsyncProcess {
List<Put> puts = new ArrayList<Put>(); List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true)); puts.add(createPut(1, true));
for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) {
ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
}
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(puts.size(), 1); Assert.assertEquals(puts.size(), 1);
@ -538,7 +756,7 @@ public class TestAsyncProcess {
public void testSubmitTrue() throws IOException { public void testSubmitTrue() throws IOException {
final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
ap.tasksInProgress.incrementAndGet(); ap.tasksInProgress.incrementAndGet();
final AtomicInteger ai = new AtomicInteger(1); final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion);
ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
final AtomicBoolean checkPoint = new AtomicBoolean(false); final AtomicBoolean checkPoint = new AtomicBoolean(false);
@ -672,6 +890,8 @@ public class TestAsyncProcess {
setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
.thenReturn(Arrays.asList(loc1, loc2, loc3));
setMockLocation(hc, FAILS, new RegionLocations(loc2)); setMockLocation(hc, FAILS, new RegionLocations(loc2));
return hc; return hc;
} }
@ -681,6 +901,18 @@ public class TestAsyncProcess {
setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_1, hrls1);
setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_2, hrls2);
setMockLocation(hc, DUMMY_BYTES_3, hrls3); setMockLocation(hc, DUMMY_BYTES_3, hrls3);
List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
for (HRegionLocation loc : hrls1.getRegionLocations()) {
locations.add(loc);
}
for (HRegionLocation loc : hrls2.getRegionLocations()) {
locations.add(loc);
}
for (HRegionLocation loc : hrls3.getRegionLocations()) {
locations.add(loc);
}
Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
.thenReturn(locations);
return hc; return hc;
} }
@ -688,6 +920,8 @@ public class TestAsyncProcess {
RegionLocations result) throws IOException { RegionLocations result) throws IOException {
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
} }
private static ClusterConnection createHConnectionCommon() { private static ClusterConnection createHConnectionCommon() {
@ -788,6 +1022,195 @@ public class TestAsyncProcess {
Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size());
} }
@Test
public void testTaskCheckerHost() throws IOException {
final int maxTotalConcurrentTasks = 100;
final int maxConcurrentTasksPerServer = 2;
final int maxConcurrentTasksPerRegion = 1;
final AtomicLong tasksInProgress = new AtomicLong(0);
final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
TaskCountChecker countChecker = new TaskCountChecker(
maxTotalConcurrentTasks,
maxConcurrentTasksPerServer,
maxConcurrentTasksPerRegion,
tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
final long maxHeapSizePerRequest = 2 * 1024 * 1024;
// unlimiited
RequestSizeChecker sizeChecker = new RequestSizeChecker(maxHeapSizePerRequest);
RowCheckerHost checkerHost = new RowCheckerHost(Arrays.asList(countChecker, sizeChecker));
ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest);
assertEquals(RowChecker.ReturnCode.INCLUDE, loc1Code);
ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest);
// rejected for size
assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc1Code_2);
ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, maxHeapSizePerRequest);
// rejected for size
assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc2Code);
// fill the task slots for loc3.
taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100));
taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L);
// rejected for count
assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc3Code);
// release the task slots for loc3.
taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0));
taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L);
assertEquals(RowChecker.ReturnCode.INCLUDE, loc3Code_2);
}
@Test
public void testRequestSizeCheckerr() throws IOException {
final long maxHeapSizePerRequest = 2 * 1024 * 1024;
final ClusterConnection conn = createHConnection();
RequestSizeChecker checker = new RequestSizeChecker(maxHeapSizePerRequest);
// inner state is unchanged.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
assertEquals(RowChecker.ReturnCode.INCLUDE, code);
code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
assertEquals(RowChecker.ReturnCode.INCLUDE, code);
}
// accept the data located on loc1 region.
ReturnCode acceptCode = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
assertEquals(RowChecker.ReturnCode.INCLUDE, acceptCode);
checker.notifyFinal(acceptCode, loc1, maxHeapSizePerRequest);
// the sn server reachs the limit.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
}
// the request to sn2 server should be accepted.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerRequest);
assertEquals(RowChecker.ReturnCode.INCLUDE, code);
}
checker.reset();
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
assertEquals(RowChecker.ReturnCode.INCLUDE, code);
code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
assertEquals(RowChecker.ReturnCode.INCLUDE, code);
}
}
@Test
public void testSubmittedSizeChecker() {
final long maxHeapSizeSubmit = 2 * 1024 * 1024;
SubmittedSizeChecker checker = new SubmittedSizeChecker(maxHeapSizeSubmit);
for (int i = 0; i != 10; ++i) {
ReturnCode include = checker.canTakeOperation(loc1, 100000);
assertEquals(ReturnCode.INCLUDE, include);
}
for (int i = 0; i != 10; ++i) {
checker.notifyFinal(ReturnCode.INCLUDE, loc1, maxHeapSizeSubmit);
}
for (int i = 0; i != 10; ++i) {
ReturnCode include = checker.canTakeOperation(loc1, 100000);
assertEquals(ReturnCode.END, include);
}
for (int i = 0; i != 10; ++i) {
ReturnCode include = checker.canTakeOperation(loc2, 100000);
assertEquals(ReturnCode.END, include);
}
checker.reset();
for (int i = 0; i != 10; ++i) {
ReturnCode include = checker.canTakeOperation(loc1, 100000);
assertEquals(ReturnCode.INCLUDE, include);
}
}
@Test
public void testTaskCountChecker() throws InterruptedIOException {
long rowSize = 12345;
int maxTotalConcurrentTasks = 100;
int maxConcurrentTasksPerServer = 2;
int maxConcurrentTasksPerRegion = 1;
AtomicLong tasksInProgress = new AtomicLong(0);
Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
TaskCountChecker checker = new TaskCountChecker(
maxTotalConcurrentTasks,
maxConcurrentTasksPerServer,
maxConcurrentTasksPerRegion,
tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
// inner state is unchanged.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(loc1, rowSize);
assertEquals(RowChecker.ReturnCode.INCLUDE, code);
}
// add loc1 region.
ReturnCode code = checker.canTakeOperation(loc1, rowSize);
assertEquals(RowChecker.ReturnCode.INCLUDE, code);
checker.notifyFinal(code, loc1, rowSize);
// fill the task slots for loc1.
taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new AtomicInteger(100));
taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100));
// the region was previously accepted, so it must be accpted now.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
checker.notifyFinal(includeCode, loc1, rowSize);
}
// fill the task slots for loc3.
taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100));
taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
// no task slots.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode excludeCode = checker.canTakeOperation(loc3, rowSize);
assertNotEquals(RowChecker.ReturnCode.INCLUDE, excludeCode);
checker.notifyFinal(excludeCode, loc3, rowSize);
}
// release the tasks for loc3.
taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0));
taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
// add loc3 region.
ReturnCode code3 = checker.canTakeOperation(loc3, rowSize);
assertEquals(RowChecker.ReturnCode.INCLUDE, code3);
checker.notifyFinal(code3, loc3, rowSize);
// the region was previously accepted, so it must be accpted now.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode includeCode = checker.canTakeOperation(loc3, rowSize);
assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
checker.notifyFinal(includeCode, loc3, rowSize);
}
checker.reset();
// the region was previously accepted,
// but checker have reseted and task slots for loc1 is full.
// So it must be rejected now.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
assertNotEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
checker.notifyFinal(includeCode, loc1, rowSize);
}
}
@Test @Test
public void testBatch() throws IOException, InterruptedException { public void testBatch() throws IOException, InterruptedException {
ClusterConnection conn = new MyConnectionImpl(conf); ClusterConnection conn = new MyConnectionImpl(conf);
@ -818,7 +1241,6 @@ public class TestAsyncProcess {
Assert.assertEquals(res[5], success); Assert.assertEquals(res[5], success);
Assert.assertEquals(res[6], failure); Assert.assertEquals(res[6], failure);
} }
@Test @Test
public void testErrorsServers() throws IOException { public void testErrorsServers() throws IOException {
Configuration configuration = new Configuration(conf); Configuration configuration = new Configuration(conf);