HBASE-25215 TestClientOperationTimeout.testScanTimeout is flaky (#2583)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
e5d4e2fc81
commit
164cc5a3dc
|
@ -271,7 +271,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
|
|||
public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
|
||||
ConfigurationObserver {
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
|
||||
|
||||
/** RPC scheduler to use for the region server. */
|
||||
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
|
||||
|
|
|
@ -17,9 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
|
@ -27,12 +31,12 @@ import org.apache.hadoop.hbase.client.Connection;
|
|||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
|
@ -40,12 +44,13 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
|
@ -69,6 +74,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
|||
@Category({ ClientTests.class, MediumTests.class })
|
||||
public class TestClientOperationTimeout {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationTimeout.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestClientOperationTimeout.class);
|
||||
|
@ -91,7 +98,7 @@ public class TestClientOperationTimeout {
|
|||
private static Table TABLE;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass() throws Exception {
|
||||
public static void setUp() throws Exception {
|
||||
// Set RegionServer class and use default values for other options.
|
||||
StartMiniClusterOption option =
|
||||
StartMiniClusterOption.builder().rsClass(DelayedRegionServer.class).build();
|
||||
|
@ -108,14 +115,6 @@ public class TestClientOperationTimeout {
|
|||
TABLE = CONN.getTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
DELAY_GET = 0;
|
||||
DELAY_SCAN = 0;
|
||||
DELAY_MUTATE = 0;
|
||||
DELAY_BATCH_MUTATE = 0;
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
Closeables.close(TABLE, true);
|
||||
|
@ -123,6 +122,14 @@ public class TestClientOperationTimeout {
|
|||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpBeforeTest() throws Exception {
|
||||
DELAY_GET = 0;
|
||||
DELAY_SCAN = 0;
|
||||
DELAY_MUTATE = 0;
|
||||
DELAY_BATCH_MUTATE = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes
|
||||
* longer than 'hbase.client.operation.timeout'.
|
||||
|
@ -132,10 +139,11 @@ public class TestClientOperationTimeout {
|
|||
DELAY_GET = 600;
|
||||
try {
|
||||
TABLE.get(new Get(ROW));
|
||||
Assert.fail("should not reach here");
|
||||
fail("should not reach here");
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(
|
||||
e instanceof RetriesExhaustedException && e.getCause() instanceof CallTimeoutException);
|
||||
LOG.info("Got exception for get", e);
|
||||
assertThat(e, instanceOf(RetriesExhaustedException.class));
|
||||
assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,10 +158,11 @@ public class TestClientOperationTimeout {
|
|||
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
try {
|
||||
TABLE.put(put);
|
||||
Assert.fail("should not reach here");
|
||||
fail("should not reach here");
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(
|
||||
e instanceof RetriesExhaustedException && e.getCause() instanceof CallTimeoutException);
|
||||
LOG.info("Got exception for put", e);
|
||||
assertThat(e, instanceOf(RetriesExhaustedException.class));
|
||||
assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,20 +173,17 @@ public class TestClientOperationTimeout {
|
|||
@Test
|
||||
public void testMultiPutsTimeout() {
|
||||
DELAY_BATCH_MUTATE = 600;
|
||||
Put put1 = new Put(ROW);
|
||||
put1.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
Put put2 = new Put(ROW);
|
||||
put2.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
List<Put> puts = new ArrayList<>();
|
||||
puts.add(put1);
|
||||
puts.add(put2);
|
||||
Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
Put put2 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
List<Put> puts = Arrays.asList(put1, put2);
|
||||
try {
|
||||
TABLE.batch(puts, new Object[2]);
|
||||
Assert.fail("should not reach here");
|
||||
fail("should not reach here");
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(
|
||||
e instanceof RetriesExhaustedException && e.getCause() instanceof RetriesExhaustedException
|
||||
&& e.getCause().getCause() instanceof CallTimeoutException);
|
||||
LOG.info("Got exception for batch", e);
|
||||
assertThat(e, instanceOf(RetriesExhaustedException.class));
|
||||
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
|
||||
assertThat(e.getCause().getCause(), instanceOf(CallTimeoutException.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,19 +192,26 @@ public class TestClientOperationTimeout {
|
|||
* longer than 'hbase.client.scanner.timeout.period'.
|
||||
*/
|
||||
@Test
|
||||
public void testScanTimeout() {
|
||||
public void testScanTimeout() throws IOException, InterruptedException {
|
||||
// cache the region location.
|
||||
try (RegionLocator locator = TABLE.getRegionLocator()) {
|
||||
locator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
// sleep a bit to make sure the location has been cached as it is an async operation.
|
||||
Thread.sleep(100);
|
||||
DELAY_SCAN = 600;
|
||||
try {
|
||||
ResultScanner scanner = TABLE.getScanner(new Scan());
|
||||
try (ResultScanner scanner = TABLE.getScanner(new Scan())) {
|
||||
scanner.next();
|
||||
Assert.fail("should not reach here");
|
||||
fail("should not reach here");
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(
|
||||
e instanceof RetriesExhaustedException && e.getCause() instanceof TimeoutIOException);
|
||||
LOG.info("Got exception for scan", e);
|
||||
assertThat(e, instanceOf(RetriesExhaustedException.class));
|
||||
assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
|
||||
}
|
||||
}
|
||||
|
||||
private static class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
||||
public static final class DelayedRegionServer
|
||||
extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
||||
public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException {
|
||||
super(conf);
|
||||
}
|
||||
|
@ -212,14 +225,14 @@ public class TestClientOperationTimeout {
|
|||
/**
|
||||
* This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
|
||||
*/
|
||||
public static class DelayedRSRpcServices extends RSRpcServices {
|
||||
private static final class DelayedRSRpcServices extends RSRpcServices {
|
||||
DelayedRSRpcServices(HRegionServer rs) throws IOException {
|
||||
super(rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
|
||||
throws ServiceException {
|
||||
throws ServiceException {
|
||||
try {
|
||||
Thread.sleep(DELAY_GET);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -230,7 +243,7 @@ public class TestClientOperationTimeout {
|
|||
|
||||
@Override
|
||||
public ClientProtos.MutateResponse mutate(RpcController rpcc,
|
||||
ClientProtos.MutateRequest request) throws ServiceException {
|
||||
ClientProtos.MutateRequest request) throws ServiceException {
|
||||
try {
|
||||
Thread.sleep(DELAY_MUTATE);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -241,7 +254,7 @@ public class TestClientOperationTimeout {
|
|||
|
||||
@Override
|
||||
public ClientProtos.ScanResponse scan(RpcController controller,
|
||||
ClientProtos.ScanRequest request) throws ServiceException {
|
||||
ClientProtos.ScanRequest request) throws ServiceException {
|
||||
try {
|
||||
Thread.sleep(DELAY_SCAN);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -252,7 +265,7 @@ public class TestClientOperationTimeout {
|
|||
|
||||
@Override
|
||||
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
|
||||
throws ServiceException {
|
||||
throws ServiceException {
|
||||
try {
|
||||
Thread.sleep(DELAY_BATCH_MUTATE);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
Loading…
Reference in New Issue