HBASE-16615 Fix flaky TestScannerHeartbeatMessages (Duo Zhang)
This commit is contained in:
parent
19fffacd3a
commit
b8857b0a7b
|
@ -22,6 +22,9 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -58,6 +61,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
|
@ -67,9 +71,6 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Here we test to make sure that scans return the expected Results when the server is sending the
|
||||
* Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
|
||||
|
@ -117,12 +118,10 @@ public class TestScannerHeartbeatMessages {
|
|||
|
||||
// In this test, we sleep after reading each row. So we should make sure after we get some number
|
||||
// of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
|
||||
// So set this to 200, we will get 3 rows and reach time limit at the start of 4th row, then sleep
|
||||
// for the 4th time. Total time is 800 ms so we will not timeout.
|
||||
private static int DEFAULT_ROW_SLEEP_TIME = 200;
|
||||
private static int DEFAULT_ROW_SLEEP_TIME = 300;
|
||||
|
||||
// Similar with row sleep time.
|
||||
private static int DEFAULT_CF_SLEEP_TIME = 200;
|
||||
private static int DEFAULT_CF_SLEEP_TIME = 300;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -182,7 +181,6 @@ public class TestScannerHeartbeatMessages {
|
|||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.deleteTable(TABLE_NAME);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
@ -196,19 +194,6 @@ public class TestScannerHeartbeatMessages {
|
|||
disableSleeping();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a variety of scan configurations to ensure that they return the expected Results when
|
||||
* heartbeat messages are necessary. These tests are accumulated under one test case to ensure
|
||||
* that they don't run in parallel. If the tests ran in parallel, they may conflict with each
|
||||
* other due to changing static variables
|
||||
*/
|
||||
@Test
|
||||
public void testScannerHeartbeatMessages() throws Exception {
|
||||
testImportanceOfHeartbeats(testHeartbeatBetweenRows());
|
||||
testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
|
||||
testImportanceOfHeartbeats(testHeartbeatWithSparseFilter());
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
|
||||
* when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
|
||||
|
@ -216,7 +201,7 @@ public class TestScannerHeartbeatMessages {
|
|||
* @param testCallable
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
|
||||
private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
|
||||
HeartbeatRPCServices.heartbeatsEnabled = true;
|
||||
|
||||
try {
|
||||
|
@ -243,8 +228,9 @@ public class TestScannerHeartbeatMessages {
|
|||
* fetched.
|
||||
* @throws Exception
|
||||
*/
|
||||
public Callable<Void> testHeartbeatBetweenRows() throws Exception {
|
||||
return new Callable<Void>() {
|
||||
@Test
|
||||
public void testHeartbeatBetweenRows() throws Exception {
|
||||
testImportanceOfHeartbeats(new Callable<Void>() {
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
|
@ -257,15 +243,16 @@ public class TestScannerHeartbeatMessages {
|
|||
testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the case that the time limit for scans is reached in between column families
|
||||
* @throws Exception
|
||||
*/
|
||||
public Callable<Void> testHeartbeatBetweenColumnFamilies() throws Exception {
|
||||
return new Callable<Void>() {
|
||||
@Test
|
||||
public void testHeartbeatBetweenColumnFamilies() throws Exception {
|
||||
testImportanceOfHeartbeats(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
// Configure the scan so that it can read the entire table in a single RPC. We want to test
|
||||
|
@ -282,24 +269,23 @@ public class TestScannerHeartbeatMessages {
|
|||
testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
public static class SparseFilter extends FilterBase{
|
||||
public static class SparseFilter extends FilterBase {
|
||||
|
||||
@Override
|
||||
public ReturnCode filterKeyValue(Cell v) throws IOException {
|
||||
try {
|
||||
Thread.sleep(CLIENT_TIMEOUT/2 + 10);
|
||||
Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ?
|
||||
ReturnCode.INCLUDE :
|
||||
ReturnCode.SKIP;
|
||||
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
|
||||
: ReturnCode.SKIP;
|
||||
}
|
||||
|
||||
public static Filter parseFrom(final byte [] pbBytes){
|
||||
public static Filter parseFrom(final byte[] pbBytes) {
|
||||
return new SparseFilter();
|
||||
}
|
||||
}
|
||||
|
@ -308,8 +294,9 @@ public class TestScannerHeartbeatMessages {
|
|||
* Test the case that there is a filter which filters most of cells
|
||||
* @throws Exception
|
||||
*/
|
||||
public Callable<Void> testHeartbeatWithSparseFilter() throws Exception {
|
||||
return new Callable<Void>() {
|
||||
@Test
|
||||
public void testHeartbeatWithSparseFilter() throws Exception {
|
||||
testImportanceOfHeartbeats(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
Scan scan = new Scan();
|
||||
|
@ -339,7 +326,7 @@ public class TestScannerHeartbeatMessages {
|
|||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -352,7 +339,7 @@ public class TestScannerHeartbeatMessages {
|
|||
* that column family are fetched
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
|
||||
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
|
||||
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
|
||||
disableSleeping();
|
||||
final ResultScanner scanner = TABLE.getScanner(scan);
|
||||
|
@ -427,14 +414,14 @@ public class TestScannerHeartbeatMessages {
|
|||
* Custom RSRpcServices instance that allows heartbeat support to be toggled
|
||||
*/
|
||||
private static class HeartbeatRPCServices extends RSRpcServices {
|
||||
private static boolean heartbeatsEnabled = true;
|
||||
private static volatile boolean heartbeatsEnabled = true;
|
||||
|
||||
public HeartbeatRPCServices(HRegionServer rs) throws IOException {
|
||||
super(rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScanResponse scan(RpcController controller, ScanRequest request)
|
||||
public ScanResponse scan(RpcController controller, ScanRequest request)
|
||||
throws ServiceException {
|
||||
ScanRequest.Builder builder = ScanRequest.newBuilder(request);
|
||||
builder.setClientHandlesHeartbeats(heartbeatsEnabled);
|
||||
|
@ -449,17 +436,17 @@ public class TestScannerHeartbeatMessages {
|
|||
*/
|
||||
private static class HeartbeatHRegion extends HRegion {
|
||||
// Row sleeps occur AFTER each row worth of cells is retrieved.
|
||||
private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
|
||||
private static boolean sleepBetweenRows = false;
|
||||
private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
|
||||
private static volatile boolean sleepBetweenRows = false;
|
||||
|
||||
// The sleep for column families can be initiated before or after we fetch the cells for the
|
||||
// column family. If the sleep occurs BEFORE then the time limits will be reached inside
|
||||
// StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
|
||||
// limit will be reached inside RegionScanner after all the cells for a column family have been
|
||||
// retrieved.
|
||||
private static boolean sleepBeforeColumnFamily = false;
|
||||
private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
|
||||
private static boolean sleepBetweenColumnFamilies = false;
|
||||
private static volatile boolean sleepBeforeColumnFamily = false;
|
||||
private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
|
||||
private static volatile boolean sleepBetweenColumnFamilies = false;
|
||||
|
||||
public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
|
||||
HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) {
|
||||
|
@ -472,20 +459,14 @@ public class TestScannerHeartbeatMessages {
|
|||
}
|
||||
|
||||
private static void columnFamilySleep() {
|
||||
if (HeartbeatHRegion.sleepBetweenColumnFamilies) {
|
||||
try {
|
||||
Thread.sleep(HeartbeatHRegion.columnFamilySleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
if (sleepBetweenColumnFamilies) {
|
||||
Threads.sleepWithoutInterrupt(columnFamilySleepTime);
|
||||
}
|
||||
}
|
||||
|
||||
private static void rowSleep() {
|
||||
try {
|
||||
if (HeartbeatHRegion.sleepBetweenRows) {
|
||||
Thread.sleep(HeartbeatHRegion.rowSleepTime);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
if (sleepBetweenRows) {
|
||||
Threads.sleepWithoutInterrupt(rowSleepTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -514,8 +495,7 @@ public class TestScannerHeartbeatMessages {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<Cell> outResults, ScannerContext context)
|
||||
throws IOException {
|
||||
public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
|
||||
boolean moreRows = super.nextRaw(outResults, context);
|
||||
HeartbeatHRegion.rowSleep();
|
||||
return moreRows;
|
||||
|
@ -542,8 +522,7 @@ public class TestScannerHeartbeatMessages {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<Cell> outResults, ScannerContext context)
|
||||
throws IOException {
|
||||
public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
|
||||
boolean moreRows = super.nextRaw(outResults, context);
|
||||
HeartbeatHRegion.rowSleep();
|
||||
return moreRows;
|
||||
|
@ -575,8 +554,7 @@ public class TestScannerHeartbeatMessages {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<Cell> result, ScannerContext context)
|
||||
throws IOException {
|
||||
public boolean next(List<Cell> result, ScannerContext context) throws IOException {
|
||||
if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
|
||||
boolean moreRows = super.next(result, context);
|
||||
if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
|
||||
|
@ -589,14 +567,13 @@ public class TestScannerHeartbeatMessages {
|
|||
* cells.
|
||||
*/
|
||||
private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
|
||||
public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
|
||||
public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
|
||||
KVComparator comparator) throws IOException {
|
||||
super(scanners, comparator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<Cell> result, ScannerContext context)
|
||||
throws IOException {
|
||||
public boolean next(List<Cell> result, ScannerContext context) throws IOException {
|
||||
if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
|
||||
boolean moreRows = super.next(result, context);
|
||||
if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
|
||||
|
|
Loading…
Reference in New Issue