HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl

This commit is contained in:
CHIA-PING TSAI 2017-03-13 14:33:36 +08:00 committed by Chia-Ping Tsai
parent 7c19490bac
commit e2a070cae0
2 changed files with 40 additions and 42 deletions

View File

@ -28,7 +28,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -181,13 +180,14 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* Runnable (that can be submitted to thread pool) that submits MultiAction to a * Runnable (that can be submitted to thread pool) that submits MultiAction to a
* single server. The server call is synchronous, therefore we do it on a thread pool. * single server. The server call is synchronous, therefore we do it on a thread pool.
*/ */
private final class SingleServerRequestRunnable implements Runnable { @VisibleForTesting
final class SingleServerRequestRunnable implements Runnable {
private final MultiAction multiAction; private final MultiAction multiAction;
private final int numAttempt; private final int numAttempt;
private final ServerName server; private final ServerName server;
private final Set<CancellableRegionServerCallable> callsInProgress; private final Set<CancellableRegionServerCallable> callsInProgress;
private Long heapSize = null; @VisibleForTesting
private SingleServerRequestRunnable( SingleServerRequestRunnable(
MultiAction multiAction, int numAttempt, ServerName server, MultiAction multiAction, int numAttempt, ServerName server,
Set<CancellableRegionServerCallable> callsInProgress) { Set<CancellableRegionServerCallable> callsInProgress) {
this.multiAction = multiAction; this.multiAction = multiAction;
@ -196,24 +196,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
this.callsInProgress = callsInProgress; this.callsInProgress = callsInProgress;
} }
@VisibleForTesting
long heapSize() {
if (heapSize != null) {
return heapSize;
}
heapSize = 0L;
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
List<Action> actions = e.getValue();
for (Action action: actions) {
Row row = action.getAction();
if (row instanceof Mutation) {
heapSize += ((Mutation) row).heapSize();
}
}
}
return heapSize;
}
@Override @Override
public void run() { public void run() {
AbstractResponse res = null; AbstractResponse res = null;
@ -303,7 +285,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private final CancellableRegionServerCallable currentCallable; private final CancellableRegionServerCallable currentCallable;
private final int operationTimeout; private final int operationTimeout;
private final int rpcTimeout; private final int rpcTimeout;
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
private final AsyncProcess asyncProcess; private final AsyncProcess asyncProcess;
/** /**
@ -423,20 +404,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
} }
@VisibleForTesting @VisibleForTesting
Map<ServerName, List<Long>> getRequestHeapSize() { SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server,
return heapSizesByServer; Set<CancellableRegionServerCallable> callsInProgress) {
return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
} }
private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server,
SingleServerRequestRunnable runnable) {
List<Long> heapCount = heapSizesByServer.get(server);
if (heapCount == null) {
heapCount = new LinkedList<>();
heapSizesByServer.put(server, heapCount);
}
heapCount.add(runnable.heapSize());
return runnable;
}
/** /**
* Group a list of actions per region servers, and send them. * Group a list of actions per region servers, and send them.
* *
@ -608,8 +580,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
} }
asyncProcess.incTaskCounters(multiAction.getRegions(), server); asyncProcess.incTaskCounters(multiAction.getRegions(), server);
SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, SingleServerRequestRunnable runnable = createSingleServerRequest(
new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); multiAction, numAttempt, server, callsInProgress);
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
} }
@ -631,8 +603,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
for (DelayingRunner runner : actions.values()) { for (DelayingRunner runner : actions.values()) {
asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
String traceText = "AsyncProcess.sendMultiAction"; String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable = addSingleServerRequestHeapSize(server, Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
// use a delay runner only if we need to sleep for some time // use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) { if (runner.getSleepTime() > 0) {
runner.setRunner(runnable); runner.setRunner(runnable);

View File

@ -261,7 +261,7 @@ public class TestAsyncProcess {
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
long nonceGroup, AsyncProcess asyncProcess) { long nonceGroup, AsyncProcess asyncProcess) {
super(task, actions, nonceGroup, asyncProcess); super(task, actions, nonceGroup, asyncProcess);
@ -272,6 +272,33 @@ public class TestAsyncProcess {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy. // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
} }
Map<ServerName, List<Long>> getRequestHeapSize() {
return heapSizesByServer;
}
@Override
SingleServerRequestRunnable createSingleServerRequest(
MultiAction multiAction, int numAttempt, ServerName server,
Set<CancellableRegionServerCallable> callsInProgress) {
SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
multiAction, numAttempt, server, callsInProgress);
List<Long> heapCount = heapSizesByServer.get(server);
if (heapCount == null) {
heapCount = new ArrayList<>();
heapSizesByServer.put(server, heapCount);
}
heapCount.add(heapSizeOf(multiAction));
return rq;
}
private long heapSizeOf(MultiAction multiAction) {
return multiAction.actions.values().stream()
.flatMap(v -> v.stream())
.map(action -> action.getAction())
.filter(row -> row instanceof Mutation)
.mapToLong(row -> ((Mutation) row).heapSize())
.sum();
}
} }
static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{ static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
@ -635,7 +662,7 @@ public class TestAsyncProcess {
if (!(req instanceof AsyncRequestFutureImpl)) { if (!(req instanceof AsyncRequestFutureImpl)) {
continue; continue;
} }
AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
if (ars.getRequestHeapSize().containsKey(sn)) { if (ars.getRequestHeapSize().containsKey(sn)) {
++actualSnReqCount; ++actualSnReqCount;
} }
@ -651,7 +678,7 @@ public class TestAsyncProcess {
if (!(req instanceof AsyncRequestFutureImpl)) { if (!(req instanceof AsyncRequestFutureImpl)) {
continue; continue;
} }
AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
long sum = 0; long sum = 0;