HBASE-16515 AsyncProcess has incorrent count of tasks if the backoff policy is enabled (ChiaPing Tsai)

This commit is contained in:
tedyu 2016-08-30 08:01:49 -07:00
parent 9cb0094bdb
commit 0f92e943ac
2 changed files with 66 additions and 19 deletions

View File

@ -1066,7 +1066,6 @@ class AsyncProcess {
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
ServerName server = e.getKey();
MultiAction<Row> multiAction = e.getValue();
incTaskCounters(multiAction.getRegions(), server);
Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
numAttempt);
// make sure we correctly count the number of runnables before we try to reuse the send
@ -1114,6 +1113,7 @@ class AsyncProcess {
if (connection.getConnectionMetrics() != null) {
connection.getConnectionMetrics().incrNormalRunners();
}
incTaskCounters(multiAction.getRegions(), server);
SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server,
new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
@ -1136,6 +1136,7 @@ class AsyncProcess {
List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
for (DelayingRunner runner : actions.values()) {
incTaskCounters(runner.getActions().getRegions(), server);
String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable = addSingleServerRequestHeapSize(server,
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
@ -1757,7 +1758,8 @@ class AsyncProcess {
}
}
private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
@VisibleForTesting
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null;
boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null;
if (!stats && !metrics) {

View File

@ -82,24 +82,10 @@ import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker;
import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -225,6 +211,11 @@ public class TestAsyncProcess {
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
@Override
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
CancellableRegionServerCallable callable) {
@ -295,7 +286,21 @@ public class TestAsyncProcess {
return new CallerWithFailure(ioe);
}
}
/**
* Make the backoff time always different on each call.
*/
static class MyClientBackoffPolicy implements ClientBackoffPolicy {
private final Map<ServerName, AtomicInteger> count = new HashMap<>();
@Override
public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
AtomicInteger inc = count.get(serverName);
if (inc == null) {
inc = new AtomicInteger(0);
count.put(serverName, inc);
}
return inc.getAndIncrement();
}
}
class MyAsyncProcessWithReplicas extends MyAsyncProcess {
private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
private long primarySleepMs = 0, replicaSleepMs = 0;
@ -835,6 +840,46 @@ public class TestAsyncProcess {
Assert.assertEquals(1, ars.getFailedOperations().size());
}
@Test
public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
ClusterConnection hc = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false);
testTaskCount(ap);
}
@Test
public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
Configuration copyConf = new Configuration(conf);
copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
ClusterConnection hc = createHConnection();
Mockito.when(hc.getConfiguration()).thenReturn(copyConf);
Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
Mockito.when(hc.getBackoffPolicy()).thenReturn(bp);
MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false);
testTaskCount(ap);
}
private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException {
List<Put> puts = new ArrayList<>();
for (int i = 0; i != 3; ++i) {
puts.add(createPut(1, true));
puts.add(createPut(2, true));
puts.add(createPut(3, true));
}
ap.submit(DUMMY_TABLE, puts, true, null, false);
ap.waitUntilDone();
// More time to wait if there are incorrect task count.
TimeUnit.SECONDS.sleep(1);
assertEquals(0, ap.tasksInProgress.get());
for (AtomicInteger count : ap.taskCounterPerRegion.values()) {
assertEquals(0, count.get());
}
for (AtomicInteger count : ap.taskCounterPerServer.values()) {
assertEquals(0, count.get());
}
}
@Test
public void testMaxTask() throws Exception {
final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);