HBASE-27048 Server side scanner time limit should account for time in queue (#4562)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
8296bb488c
commit
efe0e14744
|
@ -304,7 +304,7 @@ public class RSRpcServices
|
|||
/**
|
||||
* Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
|
||||
*/
|
||||
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
|
||||
static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
|
||||
|
||||
/**
|
||||
* Whether to reject rows with size > threshold defined by
|
||||
|
@ -3245,33 +3245,53 @@ public class RSRpcServices
|
|||
}
|
||||
}
|
||||
|
||||
private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
|
||||
// visible for testing only
|
||||
long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
|
||||
boolean allowHeartbeatMessages) {
|
||||
// Set the time limit to be half of the more restrictive timeout value (one of the
|
||||
// timeout values must be positive). In the event that both values are positive, the
|
||||
// more restrictive of the two is used to calculate the limit.
|
||||
if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
|
||||
if (allowHeartbeatMessages) {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
|
||||
if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
|
||||
long timeLimitDelta;
|
||||
if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
|
||||
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
|
||||
if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
|
||||
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
|
||||
} else {
|
||||
timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
|
||||
}
|
||||
if (controller != null && controller.getCallTimeout() > 0) {
|
||||
timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
|
||||
timeLimitDelta =
|
||||
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
|
||||
}
|
||||
|
||||
// Use half of whichever timeout value was more restrictive... But don't allow
|
||||
// the time limit to be less than the allowable minimum (could cause an
|
||||
// immediatate timeout before scanning any data).
|
||||
// immediate timeout before scanning any data).
|
||||
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
|
||||
// XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
|
||||
// ManualEnvironmentEdge. Consider using System.nanoTime instead.
|
||||
return EnvironmentEdgeManager.currentTime() + timeLimitDelta;
|
||||
return now + timeLimitDelta;
|
||||
}
|
||||
}
|
||||
// Default value of timeLimit is negative to indicate no timeLimit should be
|
||||
// enforced.
|
||||
return -1L;
|
||||
}
|
||||
|
||||
private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
|
||||
long timeout;
|
||||
if (controller != null && controller.getCallTimeout() > 0) {
|
||||
timeout = controller.getCallTimeout();
|
||||
} else if (rpcTimeout > 0) {
|
||||
timeout = rpcTimeout;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
if (call != null) {
|
||||
timeout -= (now - call.getReceiveTime());
|
||||
}
|
||||
// getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
|
||||
// return minimum value here in that case so we count this in calculating the final delta.
|
||||
return Math.max(minimumScanTimeLimitDelta, timeout);
|
||||
}
|
||||
|
||||
private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
|
||||
ScannerContext scannerContext, ScanResponse.Builder builder) {
|
||||
if (numOfCompleteRows >= limitOfRows) {
|
||||
|
@ -3286,7 +3306,7 @@ public class RSRpcServices
|
|||
// return whether we have more results in region.
|
||||
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
|
||||
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
|
||||
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context)
|
||||
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
|
||||
throws IOException {
|
||||
HRegion region = rsh.r;
|
||||
RegionScanner scanner = rsh.s;
|
||||
|
@ -3333,7 +3353,7 @@ public class RSRpcServices
|
|||
// heartbeats AND partials
|
||||
boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
|
||||
|
||||
long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
|
||||
long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
|
||||
|
||||
final LimitScope sizeScope =
|
||||
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
|
||||
|
@ -3364,7 +3384,7 @@ public class RSRpcServices
|
|||
|
||||
// Collect values to be returned here
|
||||
moreRows = scanner.nextRaw(values, scannerContext);
|
||||
if (context == null) {
|
||||
if (rpcCall == null) {
|
||||
// When there is no RpcCallContext,copy EC to heap, then the scanner would close,
|
||||
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
|
||||
// buffers.See more details in HBASE-26036.
|
||||
|
@ -3402,7 +3422,7 @@ public class RSRpcServices
|
|||
}
|
||||
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
|
||||
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
|
||||
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
|
||||
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
|
||||
results.add(r);
|
||||
numOfResults++;
|
||||
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
|
||||
|
@ -3470,7 +3490,7 @@ public class RSRpcServices
|
|||
region.closeRegionOperation();
|
||||
// Update serverside metrics, even on error.
|
||||
long end = EnvironmentEdgeManager.currentTime();
|
||||
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
|
||||
long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
|
||||
region.getMetrics().updateScanTime(end - before);
|
||||
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
|
||||
if (metricsRegionServer != null) {
|
||||
|
@ -3598,7 +3618,7 @@ public class RSRpcServices
|
|||
} else {
|
||||
rows = closeScanner ? 0 : 1;
|
||||
}
|
||||
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
|
||||
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
|
||||
// now let's do the real scan.
|
||||
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
|
||||
RegionScanner scanner = rsh.s;
|
||||
|
@ -3621,7 +3641,7 @@ public class RSRpcServices
|
|||
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
|
||||
if (!results.isEmpty()) {
|
||||
for (Result r : results) {
|
||||
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
|
||||
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
|
||||
}
|
||||
}
|
||||
if (bypass != null && bypass.booleanValue()) {
|
||||
|
@ -3630,7 +3650,7 @@ public class RSRpcServices
|
|||
}
|
||||
if (!done) {
|
||||
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
|
||||
results, builder, lastBlock, context);
|
||||
results, builder, lastBlock, rpcCall);
|
||||
} else {
|
||||
builder.setMoreResultsInRegion(!results.isEmpty());
|
||||
}
|
||||
|
@ -3642,7 +3662,7 @@ public class RSRpcServices
|
|||
quota.addScanResult(results);
|
||||
addResults(builder, results, (HBaseRpcController) controller,
|
||||
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
|
||||
isClientCellBlockSupport(context));
|
||||
isClientCellBlockSupport(rpcCall));
|
||||
if (scanner.isFilterDone() && results.isEmpty()) {
|
||||
// If the scanner's filter - if any - is done with the scan
|
||||
// only set moreResults to false if the results is empty. This is used to keep compatible
|
||||
|
@ -3674,7 +3694,7 @@ public class RSRpcServices
|
|||
}
|
||||
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
|
||||
scannerClosed = true;
|
||||
closeScanner(region, scanner, scannerName, context);
|
||||
closeScanner(region, scanner, scannerName, rpcCall);
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
|
@ -3684,7 +3704,7 @@ public class RSRpcServices
|
|||
// The scanner state might be left in a dirty state, so we will tell the Client to
|
||||
// fail this RPC and close the scanner while opening up another one from the start of
|
||||
// row that the client has last seen.
|
||||
closeScanner(region, scanner, scannerName, context);
|
||||
closeScanner(region, scanner, scannerName, rpcCall);
|
||||
|
||||
// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
|
||||
// used in two different semantics.
|
||||
|
@ -3710,7 +3730,7 @@ public class RSRpcServices
|
|||
// We closed the scanner already. Instead of throwing the IOException, and client
|
||||
// retrying with the same scannerId only to get USE on the next RPC, we directly throw
|
||||
// a special exception to save an RPC.
|
||||
if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
|
||||
if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
|
||||
// 1.4.0+ clients know how to handle
|
||||
throw new ScannerResetException("Scanner is closed on the server-side", e);
|
||||
} else {
|
||||
|
@ -3725,8 +3745,8 @@ public class RSRpcServices
|
|||
if (!scannerClosed) {
|
||||
// Adding resets expiration time on lease.
|
||||
// the closeCallBack will be set in closeScanner so here we only care about shippedCallback
|
||||
if (context != null) {
|
||||
context.setCallBack(rsh.shippedCallback);
|
||||
if (rpcCall != null) {
|
||||
rpcCall.setCallBack(rsh.shippedCallback);
|
||||
} else {
|
||||
// If context is null,here we call rsh.shippedCallback directly to reuse the logic in
|
||||
// rsh.shippedCallback to release the internal resources in rsh,and lease is also added
|
||||
|
|
|
@ -17,9 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -48,8 +50,11 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
|
@ -59,6 +64,7 @@ import org.junit.BeforeClass;
|
|||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
@ -138,8 +144,42 @@ public class TestScannerHeartbeatMessages {
|
|||
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
|
||||
}
|
||||
|
||||
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
|
||||
byte[][] qualifiers, byte[] cellValue) throws IOException {
|
||||
@Test
|
||||
public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException {
|
||||
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
||||
RSRpcServices services = new RSRpcServices(rs);
|
||||
RpcCall mockRpcCall = Mockito.mock(RpcCall.class);
|
||||
// first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing)
|
||||
// finally, 25 is fatal levels of queueing -- exceeding timeout
|
||||
when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L);
|
||||
|
||||
// assume timeout of 100ms
|
||||
HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class);
|
||||
when(mockController.getCallTimeout()).thenReturn(100);
|
||||
|
||||
// current time is 100, which we'll subtract from 90 and 50 to generate some time deltas
|
||||
EnvironmentEdgeManager.injectEdge(() -> 200L);
|
||||
|
||||
try {
|
||||
// we queued for 20ms, leaving 80ms of timeout, which we divide by 2
|
||||
assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
|
||||
// we queued for 80ms, leaving 20ms of timeout, which we divide by 2
|
||||
assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
|
||||
// we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum
|
||||
assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
|
||||
services.getTimeLimit(mockRpcCall, mockController, true));
|
||||
// lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop
|
||||
// timed out calls in the queue. in this case we still fallback on default minimum for now.
|
||||
assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
|
||||
services.getTimeLimit(mockRpcCall, mockController, true));
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static Table createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
|
||||
byte[] cellValue) throws IOException {
|
||||
Table ht = TEST_UTIL.createTable(name, families);
|
||||
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
|
||||
ht.put(puts);
|
||||
|
|
Loading…
Reference in New Issue