diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2aa5ee82d5b..9aba1a29f2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -304,7 +304,7 @@ public class RSRpcServices /** * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} */ - private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; + static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; /** * Whether to reject rows with size > threshold defined by @@ -3245,33 +3245,53 @@ public class RSRpcServices } } - private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { + // visible for testing only + long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, + boolean allowHeartbeatMessages) { // Set the time limit to be half of the more restrictive timeout value (one of the // timeout values must be positive). In the event that both values are positive, the // more restrictive of the two is used to calculate the limit. - if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { - long timeLimitDelta; - if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { - timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); - } else { - timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; + if (allowHeartbeatMessages) { + long now = EnvironmentEdgeManager.currentTime(); + long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); + if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { + long timeLimitDelta; + if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { + timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); + } else { + timeLimitDelta = + scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; + } + + // Use half of whichever timeout value was more restrictive... But don't allow + // the time limit to be less than the allowable minimum (could cause an + // immediate timeout before scanning any data). + timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); + return now + timeLimitDelta; } - if (controller != null && controller.getCallTimeout() > 0) { - timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); - } - // Use half of whichever timeout value was more restrictive... But don't allow - // the time limit to be less than the allowable minimum (could cause an - // immediatate timeout before scanning any data). - timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); - // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a - // ManualEnvironmentEdge. Consider using System.nanoTime instead. - return EnvironmentEdgeManager.currentTime() + timeLimitDelta; } // Default value of timeLimit is negative to indicate no timeLimit should be // enforced. return -1L; } + private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { + long timeout; + if (controller != null && controller.getCallTimeout() > 0) { + timeout = controller.getCallTimeout(); + } else if (rpcTimeout > 0) { + timeout = rpcTimeout; + } else { + return -1; + } + if (call != null) { + timeout -= (now - call.getReceiveTime()); + } + // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. + // return minimum value here in that case so we count this in calculating the final delta. + return Math.max(minimumScanTimeLimitDelta, timeout); + } + private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, ScannerContext scannerContext, ScanResponse.Builder builder) { if (numOfCompleteRows >= limitOfRows) { @@ -3286,7 +3306,7 @@ public class RSRpcServices // return whether we have more results in region. private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, int limitOfRows, List results, - ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) + ScanResponse.Builder builder, MutableObject lastBlock, RpcCall rpcCall) throws IOException { HRegion region = rsh.r; RegionScanner scanner = rsh.s; @@ -3333,7 +3353,7 @@ public class RSRpcServices // heartbeats AND partials boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; - long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); + long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); final LimitScope sizeScope = allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; @@ -3364,7 +3384,7 @@ public class RSRpcServices // Collect values to be returned here moreRows = scanner.nextRaw(values, scannerContext); - if (context == null) { + if (rpcCall == null) { // When there is no RpcCallContext,copy EC to heap, then the scanner would close, // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap // buffers.See more details in HBASE-26036. @@ -3402,7 +3422,7 @@ public class RSRpcServices } boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); - lastBlock.setValue(addSize(context, r, lastBlock.getValue())); + lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); results.add(r); numOfResults++; if (!mayHaveMoreCellsInRow && limitOfRows > 0) { @@ -3470,7 +3490,7 @@ public class RSRpcServices region.closeRegionOperation(); // Update serverside metrics, even on error. long end = EnvironmentEdgeManager.currentTime(); - long responseCellSize = context != null ? context.getResponseCellSize() : 0; + long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0; region.getMetrics().updateScanTime(end - before); final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); if (metricsRegionServer != null) { @@ -3598,7 +3618,7 @@ public class RSRpcServices } else { rows = closeScanner ? 0 : 1; } - RpcCallContext context = RpcServer.getCurrentCall().orElse(null); + RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); // now let's do the real scan. long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); RegionScanner scanner = rsh.s; @@ -3621,7 +3641,7 @@ public class RSRpcServices Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); if (!results.isEmpty()) { for (Result r : results) { - lastBlock.setValue(addSize(context, r, lastBlock.getValue())); + lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); } } if (bypass != null && bypass.booleanValue()) { @@ -3630,7 +3650,7 @@ public class RSRpcServices } if (!done) { scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, - results, builder, lastBlock, context); + results, builder, lastBlock, rpcCall); } else { builder.setMoreResultsInRegion(!results.isEmpty()); } @@ -3642,7 +3662,7 @@ public class RSRpcServices quota.addScanResult(results); addResults(builder, results, (HBaseRpcController) controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), - isClientCellBlockSupport(context)); + isClientCellBlockSupport(rpcCall)); if (scanner.isFilterDone() && results.isEmpty()) { // If the scanner's filter - if any - is done with the scan // only set moreResults to false if the results is empty. This is used to keep compatible @@ -3674,7 +3694,7 @@ public class RSRpcServices } if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { scannerClosed = true; - closeScanner(region, scanner, scannerName, context); + closeScanner(region, scanner, scannerName, rpcCall); } return builder.build(); } catch (IOException e) { @@ -3684,7 +3704,7 @@ public class RSRpcServices // The scanner state might be left in a dirty state, so we will tell the Client to // fail this RPC and close the scanner while opening up another one from the start of // row that the client has last seen. - closeScanner(region, scanner, scannerName, context); + closeScanner(region, scanner, scannerName, rpcCall); // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is // used in two different semantics. @@ -3710,7 +3730,7 @@ public class RSRpcServices // We closed the scanner already. Instead of throwing the IOException, and client // retrying with the same scannerId only to get USE on the next RPC, we directly throw // a special exception to save an RPC. - if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { + if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { // 1.4.0+ clients know how to handle throw new ScannerResetException("Scanner is closed on the server-side", e); } else { @@ -3725,8 +3745,8 @@ public class RSRpcServices if (!scannerClosed) { // Adding resets expiration time on lease. // the closeCallBack will be set in closeScanner so here we only care about shippedCallback - if (context != null) { - context.setCallBack(rsh.shippedCallback); + if (rpcCall != null) { + rpcCall.setCallBack(rsh.shippedCallback); } else { // If context is null,here we call rsh.shippedCallback directly to reuse the logic in // rsh.shippedCallback to release the internal resources in rsh,and lease is also added diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 2c2b203b0ff..3c621e3ee94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -48,8 +50,11 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; @@ -59,6 +64,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; @@ -138,8 +144,42 @@ public class TestScannerHeartbeatMessages { TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); } - static Table createTestTable(TableName name, byte[][] rows, byte[][] families, - byte[][] qualifiers, byte[] cellValue) throws IOException { + @Test + public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException { + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + RSRpcServices services = new RSRpcServices(rs); + RpcCall mockRpcCall = Mockito.mock(RpcCall.class); + // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing) + // finally, 25 is fatal levels of queueing -- exceeding timeout + when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L); + + // assume timeout of 100ms + HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class); + when(mockController.getCallTimeout()).thenReturn(100); + + // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas + EnvironmentEdgeManager.injectEdge(() -> 200L); + + try { + // we queued for 20ms, leaving 80ms of timeout, which we divide by 2 + assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); + // we queued for 80ms, leaving 20ms of timeout, which we divide by 2 + assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); + // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum + assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, + services.getTimeLimit(mockRpcCall, mockController, true)); + // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop + // timed out calls in the queue. in this case we still fallback on default minimum for now. + assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, + services.getTimeLimit(mockRpcCall, mockController, true)); + } finally { + EnvironmentEdgeManager.reset(); + } + + } + + static Table createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] cellValue) throws IOException { Table ht = TEST_UTIL.createTable(name, families); List puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts);