HBASE-27149 Server should close scanner if client times out before results are ready (#4604)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
6e66e78177
commit
886e31952f
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
|||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
|
||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
|
||||
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -112,7 +113,13 @@ public class MetricsHBaseServer {
|
|||
} else if (throwable instanceof UnknownScannerException) {
|
||||
source.unknownScannerException();
|
||||
} else if (throwable instanceof ScannerResetException) {
|
||||
source.scannerResetException();
|
||||
if (throwable.getCause() instanceof TimeoutIOException) {
|
||||
// Thrown by RSRpcServices, this is more accurately reported as a timeout,
|
||||
// since client will never see the actual reset exception
|
||||
source.callTimedOut();
|
||||
} else {
|
||||
source.scannerResetException();
|
||||
}
|
||||
} else if (throwable instanceof RegionMovedException) {
|
||||
source.movedRegionException();
|
||||
} else if (throwable instanceof NotServingRegionException) {
|
||||
|
|
|
@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
|||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
|
@ -3679,6 +3680,12 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
|
|||
scannerClosed = true;
|
||||
closeScanner(region, scanner, scannerName, rpcCall);
|
||||
}
|
||||
|
||||
// There's no point returning to a timed out client. Throwing ensures scanner is closed
|
||||
if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
|
||||
throw new TimeoutIOException("Client deadline exceeded, cannot return results");
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompatibilityFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestScannerTimeoutHandling {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestScannerTimeoutHandling.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestScannerTimeoutHandling.class);
|
||||
|
||||
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||
private static final MetricsAssertHelper METRICS_ASSERT =
|
||||
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
|
||||
private static final int TIMEOUT = 3000;
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("foo");
|
||||
private static Connection CONN;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
// Don't report so often so easier to see other rpcs
|
||||
conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
|
||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
|
||||
// conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
|
||||
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
TEST_UTIL.createTable(TABLE_NAME, "0");
|
||||
|
||||
CONN = ConnectionFactory.createConnection(conf);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
CONN.close();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* If a client's timeout would be exceeded before scan results are ready, there is no point
|
||||
* returning results to the client. Worse, for openScanner calls, the client cannot close the
|
||||
* timed out scanner so this leaks scanners on the server. This test verifies that we properly
|
||||
* track and cleanup scanners when a client timeout is exceeded. This should be more rare when
|
||||
* heartbeats are enabled, since we try to return before timeout there. But it's still possible if
|
||||
* queueing eats up most of the timeout or the inner workings of the scan were slowed down enough
|
||||
* to still exceed the timeout despite the calculated heartbeat deadline.
|
||||
*/
|
||||
@Test
|
||||
public void testExceededClientDeadline() throws Exception {
|
||||
Table table = CONN.getTable(TABLE_NAME);
|
||||
|
||||
// put some rows so that our scanner doesn't complete on the first rpc.
|
||||
// this would prematurely close the scanner before the timeout handling has a chance to
|
||||
for (int i = 0; i < 10; i++) {
|
||||
table.put(new Put(Bytes.toBytes(i)).addColumn(new byte[] { '0' }, new byte[] { '0' },
|
||||
new byte[] { '0' }));
|
||||
}
|
||||
|
||||
try {
|
||||
ResultScanner scanner = table.getScanner(new Scan().setCaching(1).setMaxResultSize(1));
|
||||
scanner.next();
|
||||
} catch (RetriesExhaustedException e) {
|
||||
assertTrue(e.getCause() instanceof CallTimeoutException);
|
||||
} finally {
|
||||
// ensure the scan has finished on the server side
|
||||
RSRpcServicesWithScanTimeout.lock.tryLock(60, TimeUnit.SECONDS);
|
||||
// there should be 0 running scanners, meaning the scanner was properly closed on the server
|
||||
assertEquals(0, RSRpcServicesWithScanTimeout.scannerCount);
|
||||
// we should have caught the expected exception
|
||||
assertTrue(RSRpcServicesWithScanTimeout.caughtTimeoutException);
|
||||
// we should have incremented the callTimedOut metric
|
||||
METRICS_ASSERT.assertCounterGt("exceptions.callTimedOut", 0, TEST_UTIL.getHBaseCluster()
|
||||
.getRegionServer(0).getRpcServer().getMetrics().getMetricsSource());
|
||||
}
|
||||
}
|
||||
|
||||
private static class RegionServerWithScanTimeout
|
||||
extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
|
||||
public RegionServerWithScanTimeout(Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RSRpcServices createRpcServices() throws IOException {
|
||||
return new RSRpcServicesWithScanTimeout(this);
|
||||
}
|
||||
}
|
||||
|
||||
private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
|
||||
|
||||
private static boolean caughtTimeoutException = false;
|
||||
private static int scannerCount = -1;
|
||||
private static final Lock lock = new ReentrantLock();
|
||||
|
||||
public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
|
||||
super(rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientProtos.ScanResponse scan(final RpcController controller,
|
||||
final ClientProtos.ScanRequest request) throws ServiceException {
|
||||
|
||||
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
|
||||
if (regionName.contains(TABLE_NAME.getNameAsString())) {
|
||||
|
||||
// if the client's timeout is exceeded, it may either retry or attempt to close
|
||||
// the scanner. we don't want to allow either until we've verified the server handling.
|
||||
// so only allow 1 request at a time to our test table
|
||||
try {
|
||||
if (!lock.tryLock(60, TimeUnit.SECONDS)) {
|
||||
throw new ServiceException("Failed to get lock");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.info("SLEEPING");
|
||||
Thread.sleep(TIMEOUT * 2);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
try {
|
||||
return super.scan(controller, request);
|
||||
} catch (ServiceException e) {
|
||||
if (
|
||||
e.getCause() instanceof ScannerResetException
|
||||
&& e.getCause().getCause() instanceof TimeoutIOException
|
||||
) {
|
||||
LOG.info("caught EXPECTED exception in scan after sleep", e);
|
||||
caughtTimeoutException = true;
|
||||
} else {
|
||||
LOG.warn("caught UNEXPECTED exception in scan after sleep", e);
|
||||
}
|
||||
} finally {
|
||||
scannerCount = getScannersCount();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
return super.scan(controller, request);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue