diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 41431bbd134..e6e4fd1ea5c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -28,7 +28,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -181,13 +180,14 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * Runnable (that can be submitted to thread pool) that submits MultiAction to a * single server. The server call is synchronous, therefore we do it on a thread pool. */ - private final class SingleServerRequestRunnable implements Runnable { + @VisibleForTesting + final class SingleServerRequestRunnable implements Runnable { private final MultiAction multiAction; private final int numAttempt; private final ServerName server; private final Set callsInProgress; - private Long heapSize = null; - private SingleServerRequestRunnable( + @VisibleForTesting + SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, Set callsInProgress) { this.multiAction = multiAction; @@ -196,24 +196,6 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { this.callsInProgress = callsInProgress; } - @VisibleForTesting - long heapSize() { - if (heapSize != null) { - return heapSize; - } - heapSize = 0L; - for (Map.Entry> e: this.multiAction.actions.entrySet()) { - List actions = e.getValue(); - for (Action action: actions) { - Row row = action.getAction(); - if (row instanceof Mutation) { - heapSize += ((Mutation) row).heapSize(); - } - } - } - return heapSize; - } - @Override public void run() { AbstractResponse res = null; @@ -303,7 +285,6 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { private final CancellableRegionServerCallable currentCallable; private final int operationTimeout; private final int rpcTimeout; - private final Map> heapSizesByServer = new HashMap<>(); private final AsyncProcess asyncProcess; /** @@ -423,20 +404,11 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } @VisibleForTesting - Map> getRequestHeapSize() { - return heapSizesByServer; + SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server, + Set callsInProgress) { + return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); } - private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, - SingleServerRequestRunnable runnable) { - List heapCount = heapSizesByServer.get(server); - if (heapCount == null) { - heapCount = new LinkedList<>(); - heapSizesByServer.put(server, heapCount); - } - heapCount.add(runnable.heapSize()); - return runnable; - } /** * Group a list of actions per region servers, and send them. * @@ -608,8 +580,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); } asyncProcess.incTaskCounters(multiAction.getRegions(), server); - SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + SingleServerRequestRunnable runnable = createSingleServerRequest( + multiAction, numAttempt, server, callsInProgress); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); } @@ -631,8 +603,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { for (DelayingRunner runner : actions.values()) { asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); + Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index f2f04673b0f..3139af1c0fd 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -261,7 +261,7 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl { - + private final Map> heapSizesByServer = new HashMap<>(); public MyAsyncRequestFutureImpl(AsyncProcessTask task, List actions, long nonceGroup, AsyncProcess asyncProcess) { super(task, actions, nonceGroup, asyncProcess); @@ -272,6 +272,33 @@ public class TestAsyncProcess { // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. } + Map> getRequestHeapSize() { + return heapSizesByServer; + } + + @Override + SingleServerRequestRunnable createSingleServerRequest( + MultiAction multiAction, int numAttempt, ServerName server, + Set callsInProgress) { + SingleServerRequestRunnable rq = new SingleServerRequestRunnable( + multiAction, numAttempt, server, callsInProgress); + List heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new ArrayList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(heapSizeOf(multiAction)); + return rq; + } + + private long heapSizeOf(MultiAction multiAction) { + return multiAction.actions.values().stream() + .flatMap(v -> v.stream()) + .map(action -> action.getAction()) + .filter(row -> row instanceof Mutation) + .mapToLong(row -> ((Mutation) row).heapSize()) + .sum(); + } } static class CallerWithFailure extends RpcRetryingCallerImpl{ @@ -635,7 +662,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; if (ars.getRequestHeapSize().containsKey(sn)) { ++actualSnReqCount; } @@ -651,7 +678,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; Map> requestHeapSize = ars.getRequestHeapSize(); for (Map.Entry> entry : requestHeapSize.entrySet()) { long sum = 0;