diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 5bd9a4f58ad..73cafc11236 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -404,7 +404,8 @@ class AsyncProcess { * @return pool if non null, otherwise returns this.pool if non null, otherwise throws * RuntimeException */ - private ExecutorService getPool(ExecutorService pool) { + @VisibleForTesting + ExecutorService getPool(ExecutorService pool) { if (pool != null) { return pool; } @@ -551,7 +552,8 @@ class AsyncProcess { List locationErrorRows, Map> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, + operationTimeout, rpcTimeout); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -759,13 +761,14 @@ class AsyncProcess { * Runnable (that can be submitted to thread pool) that submits MultiAction to a * single server. The server call is synchronous, therefore we do it on a thread pool. */ - private final class SingleServerRequestRunnable implements Runnable { + @VisibleForTesting + class SingleServerRequestRunnable implements Runnable { private final MultiAction multiAction; private final int numAttempt; private final ServerName server; private final Set callsInProgress; - private Long heapSize = null; - private SingleServerRequestRunnable( + @VisibleForTesting + SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, Set callsInProgress) { this.multiAction = multiAction; @@ -774,24 +777,6 @@ class AsyncProcess { this.callsInProgress = callsInProgress; } - @VisibleForTesting - long heapSize() { - if (heapSize != null) { - return heapSize; - } - heapSize = 0L; - for (Map.Entry>> e: this.multiAction.actions.entrySet()) { - List> actions = e.getValue(); - for (Action action: actions) { - Row row = action.getAction(); - if (row instanceof Mutation) { - heapSize += ((Mutation) row).heapSize(); - } - } - } - return heapSize; - } - @Override public void run() { MultiResponse res; @@ -874,7 +859,6 @@ class AsyncProcess { private PayloadCarryingServerCallable currentCallable; private int operationTimeout; private int rpcTimeout; - private final Map> heapSizesByServer = new HashMap<>(); private RetryingTimeTracker tracker; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, @@ -961,21 +945,13 @@ class AsyncProcess { public Set getCallsInProgress() { return callsInProgress; } + @VisibleForTesting - Map> getRequestHeapSize() { - return heapSizesByServer; + SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server, + Set callsInProgress) { + return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); } - private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, - SingleServerRequestRunnable runnable) { - List heapCount = heapSizesByServer.get(server); - if (heapCount == null) { - heapCount = new LinkedList<>(); - heapSizesByServer.put(server, heapCount); - } - heapCount.add(runnable.heapSize()); - return runnable; - } /** * Group a list of actions per region servers, and send them. * @@ -1148,8 +1124,7 @@ class AsyncProcess { connection.getConnectionMetrics().incrNormalRunners(); } incTaskCounters(multiAction.getRegions(), server); - SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + SingleServerRequestRunnable runnable = createSingleServerRequest(multiAction, numAttempt, server, callsInProgress); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); } @@ -1172,8 +1147,7 @@ class AsyncProcess { for (DelayingRunner runner : actions.values()) { incTaskCounters(runner.getActions().getRegions(), server); String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); + Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); @@ -1829,7 +1803,8 @@ class AsyncProcess { } } - protected AsyncRequestFutureImpl createAsyncRequestFuture( + @VisibleForTesting + AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { @@ -1838,16 +1813,6 @@ class AsyncProcess { results, callback, callable, operationTimeout, rpcTimeout); } - @VisibleForTesting - /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ - protected AsyncRequestFutureImpl createAsyncRequestFuture( - TableName tableName, List> actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults) { - return createAsyncRequestFuture( - tableName, actions, nonceGroup, pool, callback, results, needResults, null, - operationTimeout, rpcTimeout); - } - /** * Create a caller. Isolated to be easily overridden in the tests. */ diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 027d362b824..8c0b7df26d4 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -153,13 +153,16 @@ public class TestAsyncProcess { public AtomicInteger callsCt = new AtomicInteger(); private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); private long previousTimeout = -1; + @Override - protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, + AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults) { + Batch.Callback callback, Object[] results, boolean needResults, + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { // Test HTable has tableName of null, so pass DUMMY_TABLE - AsyncRequestFutureImpl r = super.createAsyncRequestFuture( - DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); + MyAsyncRequestFutureImpl r = new MyAsyncRequestFutureImpl( + DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, results, callback, callable, + operationTimeout, rpcTimeout); allReqs.add(r); callsCt.incrementAndGet(); return r; @@ -254,6 +257,50 @@ public class TestAsyncProcess { } }; } + + class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl { + + private final Map> heapSizesByServer = new HashMap<>(); + + MyAsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, + ExecutorService pool, boolean needResults, Object[] results, + Batch.Callback callback, PayloadCarryingServerCallable callable, + int operationTimeout, int rpcTimeout) { + super(tableName, actions, nonceGroup, pool, needResults, results, callback, callable, operationTimeout, rpcTimeout); + } + + Map> getRequestHeapSize() { + return heapSizesByServer; + } + + @Override + SingleServerRequestRunnable createSingleServerRequest( + MultiAction multiAction, int numAttempt, ServerName server, + Set callsInProgress) { + SingleServerRequestRunnable rq = new SingleServerRequestRunnable( + multiAction, numAttempt, server, callsInProgress); + List heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new ArrayList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(heapSizeOf(multiAction)); + return rq; + } + + long heapSizeOf(MultiAction multiAction) { + long sum = 0; + for (List> actions : multiAction.actions.values()) { + for (Action action : actions) { + Row row = action.getAction(); + if (row instanceof Mutation) { + sum += ((Mutation) row).heapSize(); + } + } + } + return sum; + } + } } static class CallerWithFailure extends RpcRetryingCaller{ @@ -644,7 +691,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl) req; if (ars.getRequestHeapSize().containsKey(sn)) { ++actualSnReqCount; } @@ -660,7 +707,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl) req; Map> requestHeapSize = ars.getRequestHeapSize(); for (Map.Entry> entry : requestHeapSize.entrySet()) { long sum = 0;