HBASE-9609 AsyncProcess doesn't incrase all the counters when trying to limit the per region flow.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1525643 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-09-23 16:50:12 +00:00
parent bba51c29c7
commit 5ebfbb9abb
2 changed files with 30 additions and 23 deletions

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -41,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -408,14 +410,11 @@ class AsyncProcess<CResult> {
Map<HRegionLocation, MultiAction<Row>> actionsByServer,
final int numAttempt,
final HConnectionManager.ServerErrorTracker errorsByServer) {
// Send the queries and add them to the inProgress list
for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
final HRegionLocation loc = e.getKey();
final MultiAction<Row> multi = e.getValue();
final String regionName = loc.getRegionInfo().getEncodedName();
incTaskCounters(regionName);
incTaskCounters(multi.getRegions());
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
@Override
@ -426,14 +425,15 @@ class AsyncProcess<CResult> {
try {
res = createCaller(callable).callWithoutRetries(callable);
} catch (IOException e) {
LOG.warn("The call to the RS failed, we don't know where we stand, " + loc, e);
LOG.warn("The call to the region server failed, we don't know where we stand, " +
loc.getServerName(), e);
resubmitAll(initialActions, multi, loc, numAttempt + 1, e, errorsByServer);
return;
}
receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
} finally {
decTaskCounters(regionName);
decTaskCounters(multi.getRegions());
}
}
});
@ -443,8 +443,9 @@ class AsyncProcess<CResult> {
} catch (RejectedExecutionException ree) {
// This should never happen. But as the pool is provided by the end user, let's secure
// this a little.
decTaskCounters(regionName);
LOG.warn("The task was rejected by the pool. This is unexpected. " + loc, ree);
decTaskCounters(multi.getRegions());
LOG.warn("The task was rejected by the pool. This is unexpected." +
" Server is " + loc.getServerName(), ree);
// We're likely to fail again, but this will increment the attempt counter, so it will
// finish.
resubmitAll(initialActions, multi, loc, numAttempt + 1, ree, errorsByServer);
@ -540,7 +541,7 @@ class AsyncProcess<CResult> {
if (toReplay.isEmpty()) {
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
initialActions.size() + "ops, NOT resubmitting, " + location);
initialActions.size() + "ops, NOT resubmitting, " + location.getServerName());
} else {
submit(initialActions, toReplay, numAttempt, true, errorsByServer);
}
@ -712,25 +713,31 @@ class AsyncProcess<CResult> {
}
/**
* incrementer the tasks counters for a given region. MT safe.
* increment the tasks counters for a given set of regions. MT safe.
*/
protected void incTaskCounters(String encodedRegionName) {
protected void incTaskCounters(Collection<byte[]> regions) {
tasksSent.incrementAndGet();
AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
if (counterPerServer == null) {
taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
counterPerServer = taskCounterPerRegion.get(encodedRegionName);
for (byte[] regBytes : regions) {
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
if (counterPerServer == null) {
taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
counterPerServer = taskCounterPerRegion.get(encodedRegionName);
}
counterPerServer.incrementAndGet();
}
counterPerServer.incrementAndGet();
}
/**
* Decrements the counters for a given region
*/
protected void decTaskCounters(String encodedRegionName) {
AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
counterPerServer.decrementAndGet();
protected void decTaskCounters(Collection<byte[]> regions) {
for (byte[] regBytes : regions) {
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
counterPerServer.decrementAndGet();
}
tasksDone.incrementAndGet();
synchronized (tasksDone) {

View File

@ -174,11 +174,11 @@ public class TestAsyncProcess {
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(true, true));
ap.incTaskCounters(hri1.getEncodedName());
ap.incTaskCounters(Arrays.asList(hri1.getRegionName()));
ap.submit(puts, false);
Assert.assertEquals(puts.size(), 1);
ap.decTaskCounters(hri1.getEncodedName());
ap.decTaskCounters(Arrays.asList(hri1.getRegionName()));
ap.submit(puts, false);
Assert.assertTrue(puts.isEmpty());
}
@ -349,7 +349,7 @@ public class TestAsyncProcess {
final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
for (int i = 0; i < 1000; i++) {
ap.incTaskCounters("dummy");
ap.incTaskCounters(Arrays.asList("dummy".getBytes()));
}
final Thread myThread = Thread.currentThread();
@ -378,7 +378,7 @@ public class TestAsyncProcess {
public void run() {
Threads.sleep(sleepTime);
while (ap.tasksDone.get() > 0) {
ap.decTaskCounters("dummy");
ap.decTaskCounters(Arrays.asList("dummy".getBytes()));
}
}
};