HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl

This commit is contained in:
CHIA-PING TSAI 2017-03-13 19:28:57 +08:00 committed by Chia-Ping Tsai
parent d1ea718e43
commit d542b446b8
2 changed files with 69 additions and 57 deletions

View File

@ -404,7 +404,8 @@ class AsyncProcess {
* @return pool if non null, otherwise returns this.pool if non null, otherwise throws * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
* RuntimeException * RuntimeException
*/ */
private ExecutorService getPool(ExecutorService pool) { @VisibleForTesting
ExecutorService getPool(ExecutorService pool) {
if (pool != null) { if (pool != null) {
return pool; return pool;
} }
@ -551,7 +552,8 @@ class AsyncProcess {
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) { ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults); tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null,
operationTimeout, rpcTimeout);
// Add location errors if any // Add location errors if any
if (locationErrors != null) { if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) { for (int i = 0; i < locationErrors.size(); ++i) {
@ -759,13 +761,14 @@ class AsyncProcess {
* Runnable (that can be submitted to thread pool) that submits MultiAction to a * Runnable (that can be submitted to thread pool) that submits MultiAction to a
* single server. The server call is synchronous, therefore we do it on a thread pool. * single server. The server call is synchronous, therefore we do it on a thread pool.
*/ */
private final class SingleServerRequestRunnable implements Runnable { @VisibleForTesting
class SingleServerRequestRunnable implements Runnable {
private final MultiAction<Row> multiAction; private final MultiAction<Row> multiAction;
private final int numAttempt; private final int numAttempt;
private final ServerName server; private final ServerName server;
private final Set<PayloadCarryingServerCallable> callsInProgress; private final Set<PayloadCarryingServerCallable> callsInProgress;
private Long heapSize = null; @VisibleForTesting
private SingleServerRequestRunnable( SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server, MultiAction<Row> multiAction, int numAttempt, ServerName server,
Set<PayloadCarryingServerCallable> callsInProgress) { Set<PayloadCarryingServerCallable> callsInProgress) {
this.multiAction = multiAction; this.multiAction = multiAction;
@ -774,24 +777,6 @@ class AsyncProcess {
this.callsInProgress = callsInProgress; this.callsInProgress = callsInProgress;
} }
@VisibleForTesting
long heapSize() {
if (heapSize != null) {
return heapSize;
}
heapSize = 0L;
for (Map.Entry<byte[], List<Action<Row>>> e: this.multiAction.actions.entrySet()) {
List<Action<Row>> actions = e.getValue();
for (Action<Row> action: actions) {
Row row = action.getAction();
if (row instanceof Mutation) {
heapSize += ((Mutation) row).heapSize();
}
}
}
return heapSize;
}
@Override @Override
public void run() { public void run() {
MultiResponse res; MultiResponse res;
@ -874,7 +859,6 @@ class AsyncProcess {
private PayloadCarryingServerCallable currentCallable; private PayloadCarryingServerCallable currentCallable;
private int operationTimeout; private int operationTimeout;
private int rpcTimeout; private int rpcTimeout;
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
private RetryingTimeTracker tracker; private RetryingTimeTracker tracker;
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
@ -961,21 +945,13 @@ class AsyncProcess {
public Set<PayloadCarryingServerCallable> getCallsInProgress() { public Set<PayloadCarryingServerCallable> getCallsInProgress() {
return callsInProgress; return callsInProgress;
} }
@VisibleForTesting @VisibleForTesting
Map<ServerName, List<Long>> getRequestHeapSize() { SingleServerRequestRunnable createSingleServerRequest(MultiAction<Row> multiAction, int numAttempt, ServerName server,
return heapSizesByServer; Set<PayloadCarryingServerCallable> callsInProgress) {
return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
} }
private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server,
SingleServerRequestRunnable runnable) {
List<Long> heapCount = heapSizesByServer.get(server);
if (heapCount == null) {
heapCount = new LinkedList<>();
heapSizesByServer.put(server, heapCount);
}
heapCount.add(runnable.heapSize());
return runnable;
}
/** /**
* Group a list of actions per region servers, and send them. * Group a list of actions per region servers, and send them.
* *
@ -1148,8 +1124,7 @@ class AsyncProcess {
connection.getConnectionMetrics().incrNormalRunners(); connection.getConnectionMetrics().incrNormalRunners();
} }
incTaskCounters(multiAction.getRegions(), server); incTaskCounters(multiAction.getRegions(), server);
SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, SingleServerRequestRunnable runnable = createSingleServerRequest(multiAction, numAttempt, server, callsInProgress);
new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
} }
@ -1172,8 +1147,7 @@ class AsyncProcess {
for (DelayingRunner runner : actions.values()) { for (DelayingRunner runner : actions.values()) {
incTaskCounters(runner.getActions().getRegions(), server); incTaskCounters(runner.getActions().getRegions(), server);
String traceText = "AsyncProcess.sendMultiAction"; String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable = addSingleServerRequestHeapSize(server, Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
// use a delay runner only if we need to sleep for some time // use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) { if (runner.getSleepTime() > 0) {
runner.setRunner(runnable); runner.setRunner(runnable);
@ -1829,7 +1803,8 @@ class AsyncProcess {
} }
} }
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( @VisibleForTesting
<CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults, Batch.Callback<CResult> callback, Object[] results, boolean needResults,
PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) {
@ -1838,16 +1813,6 @@ class AsyncProcess {
results, callback, callable, operationTimeout, rpcTimeout); results, callback, callable, operationTimeout, rpcTimeout);
} }
@VisibleForTesting
/** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
return createAsyncRequestFuture(
tableName, actions, nonceGroup, pool, callback, results, needResults, null,
operationTimeout, rpcTimeout);
}
/** /**
* Create a caller. Isolated to be easily overridden in the tests. * Create a caller. Isolated to be easily overridden in the tests.
*/ */

View File

@ -153,13 +153,16 @@ public class TestAsyncProcess {
public AtomicInteger callsCt = new AtomicInteger(); public AtomicInteger callsCt = new AtomicInteger();
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
private long previousTimeout = -1; private long previousTimeout = -1;
@Override @Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions, long nonceGroup, ExecutorService pool, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<Res> callback, Object[] results, boolean needResults) { Batch.Callback<Res> callback, Object[] results, boolean needResults,
PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) {
// Test HTable has tableName of null, so pass DUMMY_TABLE // Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture( MyAsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, results, callback, callable,
operationTimeout, rpcTimeout);
allReqs.add(r); allReqs.add(r);
callsCt.incrementAndGet(); callsCt.incrementAndGet();
return r; return r;
@ -254,6 +257,50 @@ public class TestAsyncProcess {
} }
}; };
} }
class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback<Res> callback, PayloadCarryingServerCallable callable,
int operationTimeout, int rpcTimeout) {
super(tableName, actions, nonceGroup, pool, needResults, results, callback, callable, operationTimeout, rpcTimeout);
}
Map<ServerName, List<Long>> getRequestHeapSize() {
return heapSizesByServer;
}
@Override
SingleServerRequestRunnable createSingleServerRequest(
MultiAction<Row> multiAction, int numAttempt, ServerName server,
Set<PayloadCarryingServerCallable> callsInProgress) {
SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
multiAction, numAttempt, server, callsInProgress);
List<Long> heapCount = heapSizesByServer.get(server);
if (heapCount == null) {
heapCount = new ArrayList<>();
heapSizesByServer.put(server, heapCount);
}
heapCount.add(heapSizeOf(multiAction));
return rq;
}
long heapSizeOf(MultiAction<Row> multiAction) {
long sum = 0;
for (List<Action<Row>> actions : multiAction.actions.values()) {
for (Action action : actions) {
Row row = action.getAction();
if (row instanceof Mutation) {
sum += ((Mutation) row).heapSize();
}
}
}
return sum;
}
}
} }
static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{ static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
@ -644,7 +691,7 @@ public class TestAsyncProcess {
if (!(req instanceof AsyncRequestFutureImpl)) { if (!(req instanceof AsyncRequestFutureImpl)) {
continue; continue;
} }
AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl) req;
if (ars.getRequestHeapSize().containsKey(sn)) { if (ars.getRequestHeapSize().containsKey(sn)) {
++actualSnReqCount; ++actualSnReqCount;
} }
@ -660,7 +707,7 @@ public class TestAsyncProcess {
if (!(req instanceof AsyncRequestFutureImpl)) { if (!(req instanceof AsyncRequestFutureImpl)) {
continue; continue;
} }
AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl) req;
Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
long sum = 0; long sum = 0;