HBASE-9676 AsyncProcess can create more tasks than hbase.client.max.total.tasks
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1528340 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
31c21f4a3b
commit
18d9251a77
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -87,6 +88,7 @@ import org.cloudera.htrace.Trace;
|
||||||
*/
|
*/
|
||||||
class AsyncProcess<CResult> {
|
class AsyncProcess<CResult> {
|
||||||
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
|
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
|
||||||
|
private final static int START_LOG_ERRORS_CNT = 4;
|
||||||
protected final HConnection hConnection;
|
protected final HConnection hConnection;
|
||||||
protected final TableName tableName;
|
protected final TableName tableName;
|
||||||
protected final ExecutorService pool;
|
protected final ExecutorService pool;
|
||||||
|
@ -98,8 +100,26 @@ class AsyncProcess<CResult> {
|
||||||
protected final AtomicLong tasksDone = new AtomicLong(0);
|
protected final AtomicLong tasksDone = new AtomicLong(0);
|
||||||
protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
|
protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
|
||||||
new ConcurrentHashMap<String, AtomicInteger>();
|
new ConcurrentHashMap<String, AtomicInteger>();
|
||||||
|
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
|
||||||
|
new ConcurrentHashMap<ServerName, AtomicInteger>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of tasks simultaneously executed on the cluster.
|
||||||
|
*/
|
||||||
protected final int maxTotalConcurrentTasks;
|
protected final int maxTotalConcurrentTasks;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of tasks we run in parallel on a single region.
|
||||||
|
* With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
|
||||||
|
* a set of operations on a region before the previous one is done. As well, this limits
|
||||||
|
* the pressure we put on the region server.
|
||||||
|
*/
|
||||||
protected final int maxConcurrentTasksPerRegion;
|
protected final int maxConcurrentTasksPerRegion;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of task simultaneously executed on a single region server.
|
||||||
|
*/
|
||||||
|
protected final int maxConcurrentTasksPerServer;
|
||||||
protected final long pause;
|
protected final long pause;
|
||||||
protected int numTries;
|
protected int numTries;
|
||||||
protected final boolean useServerTrackerForRetries;
|
protected final boolean useServerTrackerForRetries;
|
||||||
|
@ -191,13 +211,22 @@ class AsyncProcess<CResult> {
|
||||||
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||||
|
|
||||||
this.maxTotalConcurrentTasks = conf.getInt("hbase.client.max.total.tasks", 200);
|
this.maxTotalConcurrentTasks = conf.getInt("hbase.client.max.total.tasks", 100);
|
||||||
|
this.maxConcurrentTasksPerServer = conf.getInt("hbase.client.max.perserver.tasks", 5);
|
||||||
// With one, we ensure that the ordering of the queries is respected: we don't start
|
|
||||||
// a set of operations on a region before the previous one is done. As well, this limits
|
|
||||||
// the pressure we put on the region server.
|
|
||||||
this.maxConcurrentTasksPerRegion = conf.getInt("hbase.client.max.perregion.tasks", 1);
|
this.maxConcurrentTasksPerRegion = conf.getInt("hbase.client.max.perregion.tasks", 1);
|
||||||
|
|
||||||
|
if (this.maxTotalConcurrentTasks <= 0) {
|
||||||
|
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
||||||
|
}
|
||||||
|
if (this.maxConcurrentTasksPerServer <= 0) {
|
||||||
|
throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
|
||||||
|
maxConcurrentTasksPerServer);
|
||||||
|
}
|
||||||
|
if (this.maxConcurrentTasksPerRegion <= 0) {
|
||||||
|
throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
|
||||||
|
maxConcurrentTasksPerRegion);
|
||||||
|
}
|
||||||
|
|
||||||
this.useServerTrackerForRetries =
|
this.useServerTrackerForRetries =
|
||||||
conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
|
conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
|
||||||
|
|
||||||
|
@ -235,15 +264,22 @@ class AsyncProcess<CResult> {
|
||||||
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
|
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
// Wait until there is at least one slot for a new task.
|
||||||
|
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
|
||||||
|
|
||||||
|
// Remember the previous decisions about regions or region servers we put in the
|
||||||
|
// final multi.
|
||||||
Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
|
Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
|
||||||
long currentTaskNumber = waitForMaximumCurrentTasks(maxTotalConcurrentTasks);
|
Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
|
||||||
|
|
||||||
int posInList = -1;
|
int posInList = -1;
|
||||||
Iterator<? extends Row> it = rows.iterator();
|
Iterator<? extends Row> it = rows.iterator();
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
Row r = it.next();
|
Row r = it.next();
|
||||||
HRegionLocation loc = findDestLocation(r, 1, posInList, false, regionIncluded);
|
HRegionLocation loc = findDestLocation(r, 1, posInList);
|
||||||
|
|
||||||
if (loc != null) { // loc is null if the dest is too busy or there is an error
|
if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {
|
||||||
|
// loc is null if there is an error such as meta not available.
|
||||||
Action<Row> action = new Action<Row>(r, ++posInList);
|
Action<Row> action = new Action<Row>(r, ++posInList);
|
||||||
retainedActions.add(action);
|
retainedActions.add(action);
|
||||||
addAction(loc, action, actionsByServer);
|
addAction(loc, action, actionsByServer);
|
||||||
|
@ -251,10 +287,6 @@ class AsyncProcess<CResult> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (retainedActions.isEmpty() && atLeastOne && !hasError()) {
|
|
||||||
waitForNextTaskDone(currentTaskNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
} while (retainedActions.isEmpty() && atLeastOne && !hasError());
|
} while (retainedActions.isEmpty() && atLeastOne && !hasError());
|
||||||
|
|
||||||
HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
|
HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
|
||||||
|
@ -281,18 +313,14 @@ class AsyncProcess<CResult> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find the destination, if this destination is not considered as busy.
|
* Find the destination.
|
||||||
*
|
*
|
||||||
* @param row the row
|
* @param row the row
|
||||||
* @param numAttempt the num attempt
|
* @param numAttempt the num attempt
|
||||||
* @param posInList the position in the list
|
* @param posInList the position in the list
|
||||||
* @param force if we must submit whatever the server load
|
* @return the destination. Null if we couldn't find it.
|
||||||
* @param regionStatus the
|
|
||||||
* @return null if we should not submit, the destination otherwise.
|
|
||||||
*/
|
*/
|
||||||
private HRegionLocation findDestLocation(Row row, int numAttempt,
|
private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
|
||||||
int posInList, boolean force,
|
|
||||||
Map<String, Boolean> regionStatus) {
|
|
||||||
if (row == null){
|
if (row == null){
|
||||||
throw new IllegalArgumentException("row cannot be null");
|
throw new IllegalArgumentException("row cannot be null");
|
||||||
}
|
}
|
||||||
|
@ -316,30 +344,75 @@ class AsyncProcess<CResult> {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (force) {
|
return loc;
|
||||||
return loc;
|
|
||||||
}
|
|
||||||
|
|
||||||
String regionName = loc.getRegionInfo().getEncodedName();
|
|
||||||
Boolean addIt = regionStatus.get(regionName);
|
|
||||||
if (addIt == null) {
|
|
||||||
addIt = canTakeNewOperations(regionName);
|
|
||||||
regionStatus.put(regionName, addIt);
|
|
||||||
}
|
|
||||||
|
|
||||||
return addIt ? loc : null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if we should send new operations to this region.
|
* Check if we should send new operations to this region or region server.
|
||||||
|
* We're taking into account the past decision; if we have already accepted
|
||||||
|
* operation on a given region, we accept all operations for this region.
|
||||||
*
|
*
|
||||||
* @param encodedRegionName region name
|
*
|
||||||
|
* @param loc; the region and the server name we want to use.
|
||||||
* @return true if this region is considered as busy.
|
* @return true if this region is considered as busy.
|
||||||
*/
|
*/
|
||||||
protected boolean canTakeNewOperations(String encodedRegionName) {
|
protected boolean canTakeOperation(HRegionLocation loc,
|
||||||
AtomicInteger ct = taskCounterPerRegion.get(encodedRegionName);
|
Map<String, Boolean> regionsIncluded,
|
||||||
return ct == null || ct.get() < maxConcurrentTasksPerRegion;
|
Map<ServerName, Boolean> serversIncluded) {
|
||||||
|
String encodedRegionName = loc.getRegionInfo().getEncodedName();
|
||||||
|
Boolean regionPrevious = regionsIncluded.get(encodedRegionName);
|
||||||
|
|
||||||
|
if (regionPrevious != null) {
|
||||||
|
// We already know what to do with this region.
|
||||||
|
return regionPrevious;
|
||||||
|
}
|
||||||
|
|
||||||
|
Boolean serverPrevious = serversIncluded.get(loc.getServerName());
|
||||||
|
if (Boolean.FALSE.equals(serverPrevious)) {
|
||||||
|
// It's a new region, on a region server that we have already excluded.
|
||||||
|
regionsIncluded.put(encodedRegionName, Boolean.FALSE);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||||
|
if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
|
||||||
|
// Too many tasks on this region already.
|
||||||
|
regionsIncluded.put(encodedRegionName, Boolean.FALSE);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (serverPrevious == null) {
|
||||||
|
// The region is ok, but we need to decide for this region server.
|
||||||
|
int newServers = 0; // number of servers we're going to contact so far
|
||||||
|
for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
|
||||||
|
if (kv.getValue()) {
|
||||||
|
newServers++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do we have too many total tasks already?
|
||||||
|
boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks;
|
||||||
|
|
||||||
|
if (ok) {
|
||||||
|
// If the total is fine, is it ok for this individual server?
|
||||||
|
AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
|
||||||
|
ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ok) {
|
||||||
|
regionsIncluded.put(encodedRegionName, Boolean.FALSE);
|
||||||
|
serversIncluded.put(loc.getServerName(), Boolean.FALSE);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
serversIncluded.put(loc.getServerName(), Boolean.TRUE);
|
||||||
|
} else {
|
||||||
|
assert serverPrevious.equals(Boolean.TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
regionsIncluded.put(encodedRegionName, Boolean.TRUE);
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -359,35 +432,27 @@ class AsyncProcess<CResult> {
|
||||||
actions.add(action);
|
actions.add(action);
|
||||||
}
|
}
|
||||||
HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
|
HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
|
||||||
submit(actions, actions, 1, true, errorsByServer);
|
submit(actions, actions, 1, errorsByServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Group a list of actions per region servers, and send them. The created MultiActions are
|
* Group a list of actions per region servers, and send them. The created MultiActions are
|
||||||
* added to the inProgress list.
|
* added to the inProgress list. Does not take into account the region/server load.
|
||||||
*
|
*
|
||||||
* @param initialActions - the full list of the actions in progress
|
* @param initialActions - the full list of the actions in progress
|
||||||
* @param currentActions - the list of row to submit
|
* @param currentActions - the list of row to submit
|
||||||
* @param numAttempt - the current numAttempt (first attempt is 1)
|
* @param numAttempt - the current numAttempt (first attempt is 1)
|
||||||
* @param force - true if we submit the rowList without taking into account the server load
|
|
||||||
*/
|
*/
|
||||||
private void submit(List<Action<Row>> initialActions,
|
private void submit(List<Action<Row>> initialActions,
|
||||||
List<Action<Row>> currentActions, int numAttempt, boolean force,
|
List<Action<Row>> currentActions, int numAttempt,
|
||||||
final HConnectionManager.ServerErrorTracker errorsByServer) {
|
final HConnectionManager.ServerErrorTracker errorsByServer) {
|
||||||
// group per location => regions server
|
// group per location => regions server
|
||||||
final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
|
final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
|
||||||
new HashMap<HRegionLocation, MultiAction<Row>>();
|
new HashMap<HRegionLocation, MultiAction<Row>>();
|
||||||
|
|
||||||
// We have the same policy for a single region per call to submit: we don't want
|
|
||||||
// to send half of the actions because the status changed in the middle. So we keep the
|
|
||||||
// status
|
|
||||||
Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
|
|
||||||
|
|
||||||
for (Action<Row> action : currentActions) {
|
for (Action<Row> action : currentActions) {
|
||||||
HRegionLocation loc = findDestLocation(
|
HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex());
|
||||||
action.getAction(), 1, action.getOriginalIndex(), force, regionIncluded);
|
|
||||||
|
|
||||||
if (loc != null) {
|
if (loc != null) {
|
||||||
addAction(loc, action, actionsByServer);
|
addAction(loc, action, actionsByServer);
|
||||||
}
|
}
|
||||||
|
@ -414,7 +479,7 @@ class AsyncProcess<CResult> {
|
||||||
for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
|
for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
|
||||||
final HRegionLocation loc = e.getKey();
|
final HRegionLocation loc = e.getKey();
|
||||||
final MultiAction<Row> multi = e.getValue();
|
final MultiAction<Row> multi = e.getValue();
|
||||||
incTaskCounters(multi.getRegions());
|
incTaskCounters(multi.getRegions(), loc.getServerName());
|
||||||
|
|
||||||
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
|
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -433,7 +498,7 @@ class AsyncProcess<CResult> {
|
||||||
|
|
||||||
receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
|
receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
|
||||||
} finally {
|
} finally {
|
||||||
decTaskCounters(multi.getRegions());
|
decTaskCounters(multi.getRegions(), loc.getServerName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -443,7 +508,7 @@ class AsyncProcess<CResult> {
|
||||||
} catch (RejectedExecutionException ree) {
|
} catch (RejectedExecutionException ree) {
|
||||||
// This should never happen. But as the pool is provided by the end user, let's secure
|
// This should never happen. But as the pool is provided by the end user, let's secure
|
||||||
// this a little.
|
// this a little.
|
||||||
decTaskCounters(multi.getRegions());
|
decTaskCounters(multi.getRegions(), loc.getServerName());
|
||||||
LOG.warn("The task was rejected by the pool. This is unexpected." +
|
LOG.warn("The task was rejected by the pool. This is unexpected." +
|
||||||
" Server is " + loc.getServerName(), ree);
|
" Server is " + loc.getServerName(), ree);
|
||||||
// We're likely to fail again, but this will increment the attempt counter, so it will
|
// We're likely to fail again, but this will increment the attempt counter, so it will
|
||||||
|
@ -463,11 +528,10 @@ class AsyncProcess<CResult> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For tests.
|
* For tests.
|
||||||
* @param callable
|
* @param callable: used in tests.
|
||||||
* @return Returns a caller.
|
* @return Returns a caller.
|
||||||
*/
|
*/
|
||||||
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
|
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
|
||||||
// callable is unused.
|
|
||||||
return rpcCallerFactory.<MultiResponse> newCaller();
|
return rpcCallerFactory.<MultiResponse> newCaller();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -543,7 +607,7 @@ class AsyncProcess<CResult> {
|
||||||
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
|
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
|
||||||
initialActions.size() + "ops, NOT resubmitting, " + location.getServerName());
|
initialActions.size() + "ops, NOT resubmitting, " + location.getServerName());
|
||||||
} else {
|
} else {
|
||||||
submit(initialActions, toReplay, numAttempt, true, errorsByServer);
|
submit(initialActions, toReplay, numAttempt, errorsByServer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -619,7 +683,7 @@ class AsyncProcess<CResult> {
|
||||||
long backOffTime = (errorsByServer != null ?
|
long backOffTime = (errorsByServer != null ?
|
||||||
errorsByServer.calculateBackoffTime(location, pause) :
|
errorsByServer.calculateBackoffTime(location, pause) :
|
||||||
ConnectionUtils.getPauseTime(pause, numAttempt));
|
ConnectionUtils.getPauseTime(pause, numAttempt));
|
||||||
if (numAttempt > 3 && LOG.isDebugEnabled()) {
|
if (numAttempt > START_LOG_ERRORS_CNT && LOG.isDebugEnabled()) {
|
||||||
// We use this value to have some logs when we have multiple failures, but not too many
|
// We use this value to have some logs when we have multiple failures, but not too many
|
||||||
// logs, as errors are to be expected when a region moves, splits and so on
|
// logs, as errors are to be expected when a region moves, splits and so on
|
||||||
LOG.debug("Attempt #" + numAttempt + "/" + numTries + " failed " + failureCount +
|
LOG.debug("Attempt #" + numAttempt + "/" + numTries + " failed " + failureCount +
|
||||||
|
@ -636,10 +700,16 @@ class AsyncProcess<CResult> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
submit(initialActions, toReplay, numAttempt + 1, true, errorsByServer);
|
submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
|
||||||
} else if (failureCount != 0) {
|
} else {
|
||||||
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount +
|
if (failureCount != 0) {
|
||||||
" ops on " + location.getServerName() + " NOT resubmitting." + location);
|
// We have a failure but nothing to retry. We're done, it's a final failure..
|
||||||
|
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount +
|
||||||
|
" ops on " + location.getServerName() + " NOT resubmitting." + location);
|
||||||
|
} else if (numAttempt > START_LOG_ERRORS_CNT + 1 && LOG.isDebugEnabled()) {
|
||||||
|
// The operation was successful, but needed several attempts. Let's log this.
|
||||||
|
LOG.debug("Attempt #" + numAttempt + "/" + numTries + " is finally successful.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -664,7 +734,7 @@ class AsyncProcess<CResult> {
|
||||||
/**
|
/**
|
||||||
* Wait until the async does not have more than max tasks in progress.
|
* Wait until the async does not have more than max tasks in progress.
|
||||||
*/
|
*/
|
||||||
private long waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
|
private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
|
||||||
long lastLog = EnvironmentEdgeManager.currentTimeMillis();
|
long lastLog = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
long currentTasksDone = this.tasksDone.get();
|
long currentTasksDone = this.tasksDone.get();
|
||||||
|
|
||||||
|
@ -679,8 +749,10 @@ class AsyncProcess<CResult> {
|
||||||
waitForNextTaskDone(currentTasksDone);
|
waitForNextTaskDone(currentTasksDone);
|
||||||
currentTasksDone = this.tasksDone.get();
|
currentTasksDone = this.tasksDone.get();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return currentTasksDone;
|
private long getCurrentTasksCount(){
|
||||||
|
return tasksSent.get() - tasksDone.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -715,30 +787,39 @@ class AsyncProcess<CResult> {
|
||||||
/**
|
/**
|
||||||
* increment the tasks counters for a given set of regions. MT safe.
|
* increment the tasks counters for a given set of regions. MT safe.
|
||||||
*/
|
*/
|
||||||
protected void incTaskCounters(Collection<byte[]> regions) {
|
protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||||
tasksSent.incrementAndGet();
|
tasksSent.incrementAndGet();
|
||||||
|
|
||||||
|
AtomicInteger serverCnt = taskCounterPerServer.get(sn);
|
||||||
|
if (serverCnt == null) {
|
||||||
|
taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
|
||||||
|
serverCnt = taskCounterPerServer.get(sn);
|
||||||
|
}
|
||||||
|
serverCnt.incrementAndGet();
|
||||||
|
|
||||||
for (byte[] regBytes : regions) {
|
for (byte[] regBytes : regions) {
|
||||||
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
|
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
|
||||||
AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
|
AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||||
if (counterPerServer == null) {
|
if (regionCnt == null) {
|
||||||
taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
|
taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
|
||||||
counterPerServer = taskCounterPerRegion.get(encodedRegionName);
|
regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||||
}
|
}
|
||||||
counterPerServer.incrementAndGet();
|
regionCnt.incrementAndGet();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decrements the counters for a given region
|
* Decrements the counters for a given region and the region server. MT Safe.
|
||||||
*/
|
*/
|
||||||
protected void decTaskCounters(Collection<byte[]> regions) {
|
protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||||
for (byte[] regBytes : regions) {
|
for (byte[] regBytes : regions) {
|
||||||
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
|
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
|
||||||
AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
|
AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||||
counterPerServer.decrementAndGet();
|
regionCnt.decrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taskCounterPerServer.get(sn).decrementAndGet();
|
||||||
|
|
||||||
tasksDone.incrementAndGet();
|
tasksDone.incrementAndGet();
|
||||||
synchronized (tasksDone) {
|
synchronized (tasksDone) {
|
||||||
tasksDone.notifyAll();
|
tasksDone.notifyAll();
|
||||||
|
|
|
@ -40,7 +40,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
@ -55,6 +54,7 @@ public class TestAsyncProcess {
|
||||||
TableName.valueOf("DUMMY_TABLE");
|
TableName.valueOf("DUMMY_TABLE");
|
||||||
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
|
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
|
||||||
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
|
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
|
||||||
|
private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
|
||||||
private static final byte[] FAILS = "FAILS".getBytes();
|
private static final byte[] FAILS = "FAILS".getBytes();
|
||||||
private static final Configuration conf = new Configuration();
|
private static final Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
@ -63,8 +63,11 @@ public class TestAsyncProcess {
|
||||||
private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
|
private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
|
||||||
private static HRegionInfo hri2 =
|
private static HRegionInfo hri2 =
|
||||||
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
|
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
|
||||||
|
private static HRegionInfo hri3 =
|
||||||
|
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW);
|
||||||
private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
|
private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
|
||||||
private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
|
private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
|
||||||
|
private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
|
||||||
|
|
||||||
private static final String success = "success";
|
private static final String success = "success";
|
||||||
private static Exception failure = new Exception("failure");
|
private static Exception failure = new Exception("failure");
|
||||||
|
@ -180,10 +183,15 @@ public class TestAsyncProcess {
|
||||||
@Override
|
@Override
|
||||||
public HRegionLocation locateRegion(final TableName tableName,
|
public HRegionLocation locateRegion(final TableName tableName,
|
||||||
final byte[] row) {
|
final byte[] row) {
|
||||||
Random rd = new Random(Bytes.toLong(row));
|
int i = 0;
|
||||||
int pos = rd.nextInt(hrl.size());
|
for (HRegionLocation hr:hrl){
|
||||||
usedRegions[pos] = true;
|
if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
|
||||||
return hrl.get(pos);
|
usedRegions[i] = true;
|
||||||
|
return hr;
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,7 +201,7 @@ public class TestAsyncProcess {
|
||||||
AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
|
AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
|
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
Assert.assertTrue(puts.isEmpty());
|
Assert.assertTrue(puts.isEmpty());
|
||||||
|
@ -206,7 +214,7 @@ public class TestAsyncProcess {
|
||||||
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
|
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
Assert.assertTrue(puts.isEmpty());
|
Assert.assertTrue(puts.isEmpty());
|
||||||
|
@ -223,13 +231,35 @@ public class TestAsyncProcess {
|
||||||
AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
|
AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
|
|
||||||
ap.incTaskCounters(Arrays.asList(hri1.getRegionName()));
|
ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
Assert.assertEquals(puts.size(), 1);
|
Assert.assertEquals(puts.size(), 1);
|
||||||
|
|
||||||
ap.decTaskCounters(Arrays.asList(hri1.getRegionName()));
|
ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
|
||||||
|
ap.submit(puts, false);
|
||||||
|
Assert.assertTrue(puts.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitBusyRegionServer() throws Exception {
|
||||||
|
HConnection hc = createHConnection();
|
||||||
|
AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, null, conf);
|
||||||
|
|
||||||
|
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
|
||||||
|
|
||||||
|
List<Put> puts = new ArrayList<Put>();
|
||||||
|
puts.add(createPut(1, true));
|
||||||
|
puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
|
||||||
|
puts.add(createPut(1, true)); // <== this one will make it, the region is already in
|
||||||
|
puts.add(createPut(2, true)); // <== new region, but the rs is ok
|
||||||
|
|
||||||
|
ap.submit(puts, false);
|
||||||
|
Assert.assertEquals(1, puts.size());
|
||||||
|
|
||||||
|
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
Assert.assertTrue(puts.isEmpty());
|
Assert.assertTrue(puts.isEmpty());
|
||||||
}
|
}
|
||||||
|
@ -241,7 +271,7 @@ public class TestAsyncProcess {
|
||||||
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
Put p = createPut(true, false);
|
Put p = createPut(1, false);
|
||||||
puts.add(p);
|
puts.add(p);
|
||||||
|
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
|
@ -318,7 +348,7 @@ public class TestAsyncProcess {
|
||||||
};
|
};
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
Put p = createPut(true, true);
|
Put p = createPut(1, true);
|
||||||
puts.add(p);
|
puts.add(p);
|
||||||
|
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
|
@ -342,9 +372,9 @@ public class TestAsyncProcess {
|
||||||
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
puts.add(createPut(true, false));
|
puts.add(createPut(1, false));
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
|
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
Assert.assertTrue(puts.isEmpty());
|
Assert.assertTrue(puts.isEmpty());
|
||||||
|
@ -359,7 +389,7 @@ public class TestAsyncProcess {
|
||||||
Assert.assertEquals(1, ap.getErrors().actions.size());
|
Assert.assertEquals(1, ap.getErrors().actions.size());
|
||||||
|
|
||||||
|
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
Assert.assertTrue(puts.isEmpty());
|
Assert.assertTrue(puts.isEmpty());
|
||||||
|
|
||||||
|
@ -380,9 +410,9 @@ public class TestAsyncProcess {
|
||||||
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
puts.add(createPut(true, false));
|
puts.add(createPut(1, false));
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
|
|
||||||
ap.submit(puts, false);
|
ap.submit(puts, false);
|
||||||
ap.waitUntilDone();
|
ap.waitUntilDone();
|
||||||
|
@ -400,7 +430,7 @@ public class TestAsyncProcess {
|
||||||
final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
|
final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
|
||||||
|
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
ap.incTaskCounters(Arrays.asList("dummy".getBytes()));
|
ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
|
||||||
}
|
}
|
||||||
|
|
||||||
final Thread myThread = Thread.currentThread();
|
final Thread myThread = Thread.currentThread();
|
||||||
|
@ -413,7 +443,7 @@ public class TestAsyncProcess {
|
||||||
};
|
};
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
|
|
||||||
t.start();
|
t.start();
|
||||||
|
|
||||||
|
@ -429,7 +459,7 @@ public class TestAsyncProcess {
|
||||||
public void run() {
|
public void run() {
|
||||||
Threads.sleep(sleepTime);
|
Threads.sleep(sleepTime);
|
||||||
while (ap.tasksDone.get() > 0) {
|
while (ap.tasksDone.get() > 0) {
|
||||||
ap.decTaskCounters(Arrays.asList("dummy".getBytes()));
|
ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -483,6 +513,11 @@ public class TestAsyncProcess {
|
||||||
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
|
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
|
||||||
Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
|
Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
|
||||||
|
|
||||||
|
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
|
||||||
|
Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2);
|
||||||
|
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
|
||||||
|
Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3);
|
||||||
|
|
||||||
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
|
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
|
||||||
Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
|
Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
|
||||||
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
|
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
|
||||||
|
@ -497,7 +532,7 @@ public class TestAsyncProcess {
|
||||||
HConnection hc = createHConnection();
|
HConnection hc = createHConnection();
|
||||||
ht.ap = new MyAsyncProcess<Object>(hc, null, conf);
|
ht.ap = new MyAsyncProcess<Object>(hc, null, conf);
|
||||||
|
|
||||||
Put put = createPut(true, true);
|
Put put = createPut(1, true);
|
||||||
|
|
||||||
Assert.assertEquals(0, ht.getWriteBufferSize());
|
Assert.assertEquals(0, ht.getWriteBufferSize());
|
||||||
ht.put(put);
|
ht.put(put);
|
||||||
|
@ -516,7 +551,7 @@ public class TestAsyncProcess {
|
||||||
ht.setWriteBufferSize(0L);
|
ht.setWriteBufferSize(0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
Put put = createPut(true, false);
|
Put put = createPut(1, false);
|
||||||
|
|
||||||
Assert.assertEquals(0L, ht.currentWriteBufferSize);
|
Assert.assertEquals(0L, ht.currentWriteBufferSize);
|
||||||
try {
|
try {
|
||||||
|
@ -555,7 +590,7 @@ public class TestAsyncProcess {
|
||||||
ht.setAutoFlush(false, true);
|
ht.setAutoFlush(false, true);
|
||||||
ht.setWriteBufferSize(0);
|
ht.setWriteBufferSize(0);
|
||||||
|
|
||||||
Put p = createPut(true, false);
|
Put p = createPut(1, false);
|
||||||
ht.put(p);
|
ht.put(p);
|
||||||
|
|
||||||
ht.ap.waitUntilDone(); // Let's do all the retries.
|
ht.ap.waitUntilDone(); // Let's do all the retries.
|
||||||
|
@ -565,7 +600,7 @@ public class TestAsyncProcess {
|
||||||
// This said, it's not a very easy going behavior. For example, when we insert a list of
|
// This said, it's not a very easy going behavior. For example, when we insert a list of
|
||||||
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
|
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
|
||||||
// manage what was inserted, what was tried but failed, and what was not even tried.
|
// manage what was inserted, what was tried but failed, and what was not even tried.
|
||||||
p = createPut(true, true);
|
p = createPut(1, true);
|
||||||
Assert.assertEquals(0, ht.writeAsyncBuffer.size());
|
Assert.assertEquals(0, ht.writeAsyncBuffer.size());
|
||||||
try {
|
try {
|
||||||
ht.put(p);
|
ht.put(p);
|
||||||
|
@ -584,7 +619,7 @@ public class TestAsyncProcess {
|
||||||
ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
||||||
ht.setAutoFlush(false, false);
|
ht.setAutoFlush(false, false);
|
||||||
|
|
||||||
Put p = createPut(true, false);
|
Put p = createPut(1, false);
|
||||||
ht.put(p);
|
ht.put(p);
|
||||||
Assert.assertEquals(0, ht.writeAsyncBuffer.size());
|
Assert.assertEquals(0, ht.writeAsyncBuffer.size());
|
||||||
try {
|
try {
|
||||||
|
@ -606,13 +641,13 @@ public class TestAsyncProcess {
|
||||||
ht.connection = new MyConnectionImpl();
|
ht.connection = new MyConnectionImpl();
|
||||||
|
|
||||||
List<Put> puts = new ArrayList<Put>();
|
List<Put> puts = new ArrayList<Put>();
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
puts.add(createPut(true, false)); // <=== the bad apple, position 4
|
puts.add(createPut(1, false)); // <=== the bad apple, position 4
|
||||||
puts.add(createPut(true, true));
|
puts.add(createPut(1, true));
|
||||||
puts.add(createPut(true, false)); // <=== another bad apple, position 6
|
puts.add(createPut(1, false)); // <=== another bad apple, position 6
|
||||||
|
|
||||||
Object[] res = new Object[puts.size()];
|
Object[] res = new Object[puts.size()];
|
||||||
try {
|
try {
|
||||||
|
@ -650,7 +685,7 @@ public class TestAsyncProcess {
|
||||||
ht.ap.serverTrackerTimeout = 1;
|
ht.ap.serverTrackerTimeout = 1;
|
||||||
|
|
||||||
|
|
||||||
Put p = createPut(true, false);
|
Put p = createPut(1, false);
|
||||||
ht.setAutoFlush(false, false);
|
ht.setAutoFlush(false, false);
|
||||||
ht.put(p);
|
ht.put(p);
|
||||||
|
|
||||||
|
@ -679,14 +714,14 @@ public class TestAsyncProcess {
|
||||||
HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
|
HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
|
||||||
hrls.add(hrl);
|
hrls.add(hrl);
|
||||||
|
|
||||||
Get get = new Get(Bytes.toBytes(i * 10L + 5L));
|
Get get = new Get(Bytes.toBytes(i * 10L));
|
||||||
gets.add(get);
|
gets.add(get);
|
||||||
}
|
}
|
||||||
|
|
||||||
HTable ht = new HTable();
|
HTable ht = new HTable();
|
||||||
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
|
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
|
||||||
ht.connection = con;
|
ht.connection = con;
|
||||||
ht.batch((List) gets);
|
ht.batch(gets);
|
||||||
|
|
||||||
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
|
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
|
||||||
Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server
|
Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server
|
||||||
|
@ -696,23 +731,31 @@ public class TestAsyncProcess {
|
||||||
for (int i =0; i<NB_REGS; i++){
|
for (int i =0; i<NB_REGS; i++){
|
||||||
if (con.usedRegions[i]) nbReg++;
|
if (con.usedRegions[i]) nbReg++;
|
||||||
}
|
}
|
||||||
Assert.assertTrue("nbReg=" + nbReg, nbReg > NB_REGS / 10);
|
Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param reg1 if true, creates a put on region 1, region 2 otherwise
|
* @param regCnt the region: 1 to 3.
|
||||||
* @param success if true, the put will succeed.
|
* @param success if true, the put will succeed.
|
||||||
* @return a put
|
* @return a put
|
||||||
*/
|
*/
|
||||||
private Put createPut(boolean reg1, boolean success) {
|
private Put createPut(int regCnt, boolean success) {
|
||||||
Put p;
|
Put p;
|
||||||
if (!success) {
|
if (!success) {
|
||||||
p = new Put(FAILS);
|
p = new Put(FAILS);
|
||||||
} else if (reg1) {
|
} else switch (regCnt){
|
||||||
p = new Put(DUMMY_BYTES_1);
|
case 1 :
|
||||||
} else {
|
p = new Put(DUMMY_BYTES_1);
|
||||||
p = new Put(DUMMY_BYTES_2);
|
break;
|
||||||
|
case 2:
|
||||||
|
p = new Put(DUMMY_BYTES_2);
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
p = new Put(DUMMY_BYTES_3);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("unknown " + regCnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
|
p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
|
||||||
|
|
Loading…
Reference in New Issue