HBASE-27048 Server side scanner time limit should account for time in queue (#4562)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Bryan Beaudreault 2022-07-06 11:57:37 -04:00
parent 588ad6b6eb
commit 679e40e068
2 changed files with 93 additions and 33 deletions

View File

@ -289,7 +289,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/** /**
* Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
*/ */
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
/* /*
* Whether to reject rows with size > threshold defined by * Whether to reject rows with size > threshold defined by
@ -3251,33 +3251,53 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { // visible for testing only
long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
boolean allowHeartbeatMessages) {
// Set the time limit to be half of the more restrictive timeout value (one of the // Set the time limit to be half of the more restrictive timeout value (one of the
// timeout values must be positive). In the event that both values are positive, the // timeout values must be positive). In the event that both values are positive, the
// more restrictive of the two is used to calculate the limit. // more restrictive of the two is used to calculate the limit.
if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { if (allowHeartbeatMessages) {
long timeLimitDelta; long now = EnvironmentEdgeManager.currentTime();
if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
} else { long timeLimitDelta;
timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
} else {
timeLimitDelta =
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediate timeout before scanning any data).
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
return now + timeLimitDelta;
} }
if (controller != null && controller.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediatate timeout before scanning any data).
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
// XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
// ManualEnvironmentEdge. Consider using System.nanoTime instead.
return System.currentTimeMillis() + timeLimitDelta;
} }
// Default value of timeLimit is negative to indicate no timeLimit should be // Default value of timeLimit is negative to indicate no timeLimit should be
// enforced. // enforced.
return -1L; return -1L;
} }
private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
long timeout;
if (controller != null && controller.getCallTimeout() > 0) {
timeout = controller.getCallTimeout();
} else if (rpcTimeout > 0) {
timeout = rpcTimeout;
} else {
return -1;
}
if (call != null) {
timeout -= (now - call.getReceiveTime());
}
// getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
// return minimum value here in that case so we count this in calculating the final delta.
return Math.max(minimumScanTimeLimitDelta, timeout);
}
private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
ScannerContext scannerContext, ScanResponse.Builder builder) { ScannerContext scannerContext, ScanResponse.Builder builder) {
if (numOfCompleteRows >= limitOfRows) { if (numOfCompleteRows >= limitOfRows) {
@ -3291,9 +3311,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// return whether we have more results in region. // return whether we have more results in region.
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context) ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
throws IOException { throws IOException {
HRegion region = rsh.r; HRegion region = rsh.r;
RegionScanner scanner = rsh.s; RegionScanner scanner = rsh.s;
long maxResultSize; long maxResultSize;
@ -3339,7 +3359,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// heartbeats AND partials // heartbeats AND partials
boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
final LimitScope sizeScope = final LimitScope sizeScope =
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
@ -3397,7 +3417,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
lastBlock.setValue(addSize(context, r, lastBlock.getValue())); lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
results.add(r); results.add(r);
numOfResults++; numOfResults++;
if (!mayHaveMoreCellsInRow && limitOfRows > 0) { if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
@ -3465,7 +3485,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region.closeRegionOperation(); region.closeRegionOperation();
// Update serverside metrics, even on error. // Update serverside metrics, even on error.
long end = EnvironmentEdgeManager.currentTime(); long end = EnvironmentEdgeManager.currentTime();
long responseCellSize = context != null ? context.getResponseCellSize() : 0; long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
region.getMetrics().updateScanTime(end - before); region.getMetrics().updateScanTime(end - before);
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) { if (metricsRegionServer != null) {
@ -3595,7 +3615,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else { } else {
rows = closeScanner ? 0 : 1; rows = closeScanner ? 0 : 1;
} }
RpcCallContext context = RpcServer.getCurrentCall().orElse(null); RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
// now let's do the real scan. // now let's do the real scan.
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
RegionScanner scanner = rsh.s; RegionScanner scanner = rsh.s;
@ -3618,7 +3638,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) { if (!results.isEmpty()) {
for (Result r : results) { for (Result r : results) {
lastBlock.setValue(addSize(context, r, lastBlock.getValue())); lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
} }
} }
if (bypass != null && bypass.booleanValue()) { if (bypass != null && bypass.booleanValue()) {
@ -3627,7 +3647,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
if (!done) { if (!done) {
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
results, builder, lastBlock, context); results, builder, lastBlock, rpcCall);
} else { } else {
builder.setMoreResultsInRegion(!results.isEmpty()); builder.setMoreResultsInRegion(!results.isEmpty());
} }
@ -3639,7 +3659,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
quota.addScanResult(results); quota.addScanResult(results);
addResults(builder, results, (HBaseRpcController) controller, addResults(builder, results, (HBaseRpcController) controller,
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
isClientCellBlockSupport(context)); isClientCellBlockSupport(rpcCall));
if (scanner.isFilterDone() && results.isEmpty()) { if (scanner.isFilterDone() && results.isEmpty()) {
// If the scanner's filter - if any - is done with the scan // If the scanner's filter - if any - is done with the scan
// only set moreResults to false if the results is empty. This is used to keep compatible // only set moreResults to false if the results is empty. This is used to keep compatible
@ -3671,7 +3691,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
scannerClosed = true; scannerClosed = true;
closeScanner(region, scanner, scannerName, context); closeScanner(region, scanner, scannerName, rpcCall);
} }
return builder.build(); return builder.build();
} catch (IOException e) { } catch (IOException e) {
@ -3681,7 +3701,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// The scanner state might be left in a dirty state, so we will tell the Client to // The scanner state might be left in a dirty state, so we will tell the Client to
// fail this RPC and close the scanner while opening up another one from the start of // fail this RPC and close the scanner while opening up another one from the start of
// row that the client has last seen. // row that the client has last seen.
closeScanner(region, scanner, scannerName, context); closeScanner(region, scanner, scannerName, rpcCall);
// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
// used in two different semantics. // used in two different semantics.
@ -3707,7 +3727,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// We closed the scanner already. Instead of throwing the IOException, and client // We closed the scanner already. Instead of throwing the IOException, and client
// retrying with the same scannerId only to get USE on the next RPC, we directly throw // retrying with the same scannerId only to get USE on the next RPC, we directly throw
// a special exception to save an RPC. // a special exception to save an RPC.
if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
// 1.4.0+ clients know how to handle // 1.4.0+ clients know how to handle
throw new ScannerResetException("Scanner is closed on the server-side", e); throw new ScannerResetException("Scanner is closed on the server-side", e);
} else { } else {
@ -3722,8 +3742,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!scannerClosed) { if (!scannerClosed) {
// Adding resets expiration time on lease. // Adding resets expiration time on lease.
// the closeCallBack will be set in closeScanner so here we only care about shippedCallback // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
if (context != null) { if (rpcCall != null) {
context.setCallBack(rsh.shippedCallback); rpcCall.setCallBack(rsh.shippedCallback);
} else { } else {
// When context != null, adding back the lease will be done in callback set above. // When context != null, adding back the lease will be done in callback set above.
addScannerLeaseBack(lease); addScannerLeaseBack(lease);

View File

@ -17,9 +17,11 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -48,9 +50,12 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After; import org.junit.After;
@ -60,6 +65,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
@ -139,6 +145,40 @@ public class TestScannerHeartbeatMessages {
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
} }
@Test
public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
RSRpcServices services = new RSRpcServices(rs);
RpcCall mockRpcCall = Mockito.mock(RpcCall.class);
// first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing)
// finally, 25 is fatal levels of queueing -- exceeding timeout
when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L);
// assume timeout of 100ms
HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class);
when(mockController.getCallTimeout()).thenReturn(100);
// current time is 100, which we'll subtract from 90 and 50 to generate some time deltas
EnvironmentEdgeManager.injectEdge(() -> 200L);
try {
// we queued for 20ms, leaving 80ms of timeout, which we divide by 2
assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
// we queued for 80ms, leaving 20ms of timeout, which we divide by 2
assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
// we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum
assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
services.getTimeLimit(mockRpcCall, mockController, true));
// lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop
// timed out calls in the queue. in this case we still fallback on default minimum for now.
assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
services.getTimeLimit(mockRpcCall, mockController, true));
} finally {
EnvironmentEdgeManager.reset();
}
}
static Table createTestTable(TableName name, byte[][] rows, byte[][] families, static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException { byte[][] qualifiers, byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families); Table ht = TEST_UTIL.createTable(name, families);