diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 2d9ba6ee88d..b0314138038 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -21,6 +21,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -55,6 +58,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.log4j.Level; import org.junit.After; @@ -64,9 +68,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** * Here we test to make sure that scans return the expected Results when the server is sending the * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent @@ -113,12 +114,10 @@ public class TestScannerHeartbeatMessages { // In this test, we sleep after reading each row. So we should make sure after we get some number // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping. - // So set this to 200, we will get 3 rows and reach time limit at the start of 4th row, then sleep - // for the 4th time. Total time is 800 ms so we will not timeout. - private static int DEFAULT_ROW_SLEEP_TIME = 200; + private static int DEFAULT_ROW_SLEEP_TIME = 300; // Similar with row sleep time. - private static int DEFAULT_CF_SLEEP_TIME = 200; + private static int DEFAULT_CF_SLEEP_TIME = 300; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -178,7 +177,6 @@ public class TestScannerHeartbeatMessages { @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.deleteTable(TABLE_NAME); TEST_UTIL.shutdownMiniCluster(); } @@ -192,19 +190,6 @@ public class TestScannerHeartbeatMessages { disableSleeping(); } - /** - * Test a variety of scan configurations to ensure that they return the expected Results when - * heartbeat messages are necessary. These tests are accumulated under one test case to ensure - * that they don't run in parallel. If the tests ran in parallel, they may conflict with each - * other due to changing static variables - */ - @Test - public void testScannerHeartbeatMessages() throws Exception { - testImportanceOfHeartbeats(testHeartbeatBetweenRows()); - testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies()); - testImportanceOfHeartbeats(testHeartbeatWithSparseFilter()); - } - /** * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are @@ -212,7 +197,7 @@ public class TestScannerHeartbeatMessages { * @param testCallable * @throws InterruptedException */ - public void testImportanceOfHeartbeats(Callable testCallable) throws InterruptedException { + private void testImportanceOfHeartbeats(Callable testCallable) throws InterruptedException { HeartbeatRPCServices.heartbeatsEnabled = true; try { @@ -239,8 +224,9 @@ public class TestScannerHeartbeatMessages { * fetched. * @throws Exception */ - public Callable testHeartbeatBetweenRows() throws Exception { - return new Callable() { + @Test + public void testHeartbeatBetweenRows() throws Exception { + testImportanceOfHeartbeats(new Callable() { @Override public Void call() throws Exception { @@ -253,15 +239,16 @@ public class TestScannerHeartbeatMessages { testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); return null; } - }; + }); } /** * Test the case that the time limit for scans is reached in between column families * @throws Exception */ - public Callable testHeartbeatBetweenColumnFamilies() throws Exception { - return new Callable() { + @Test + public void testHeartbeatBetweenColumnFamilies() throws Exception { + testImportanceOfHeartbeats(new Callable() { @Override public Void call() throws Exception { // Configure the scan so that it can read the entire table in a single RPC. We want to test @@ -278,24 +265,23 @@ public class TestScannerHeartbeatMessages { testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); return null; } - }; + }); } - public static class SparseFilter extends FilterBase{ + public static class SparseFilter extends FilterBase { @Override public ReturnCode filterKeyValue(Cell v) throws IOException { try { - Thread.sleep(CLIENT_TIMEOUT/2 + 10); + Thread.sleep(CLIENT_TIMEOUT / 2 + 100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? - ReturnCode.INCLUDE : - ReturnCode.SKIP; + return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE + : ReturnCode.SKIP; } - public static Filter parseFrom(final byte [] pbBytes){ + public static Filter parseFrom(final byte[] pbBytes) { return new SparseFilter(); } } @@ -304,8 +290,9 @@ public class TestScannerHeartbeatMessages { * Test the case that there is a filter which filters most of cells * @throws Exception */ - public Callable testHeartbeatWithSparseFilter() throws Exception { - return new Callable() { + @Test + public void testHeartbeatWithSparseFilter() throws Exception { + testImportanceOfHeartbeats(new Callable() { @Override public Void call() throws Exception { Scan scan = new Scan(); @@ -335,7 +322,7 @@ public class TestScannerHeartbeatMessages { return null; } - }; + }); } /** @@ -348,7 +335,7 @@ public class TestScannerHeartbeatMessages { * that column family are fetched * @throws Exception */ - public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, + private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) throws Exception { disableSleeping(); final ResultScanner scanner = TABLE.getScanner(scan); @@ -423,14 +410,14 @@ public class TestScannerHeartbeatMessages { * Custom RSRpcServices instance that allows heartbeat support to be toggled */ private static class HeartbeatRPCServices extends RSRpcServices { - private static boolean heartbeatsEnabled = true; + private static volatile boolean heartbeatsEnabled = true; public HeartbeatRPCServices(HRegionServer rs) throws IOException { super(rs); } @Override - public ScanResponse scan(RpcController controller, ScanRequest request) + public ScanResponse scan(RpcController controller, ScanRequest request) throws ServiceException { ScanRequest.Builder builder = ScanRequest.newBuilder(request); builder.setClientHandlesHeartbeats(heartbeatsEnabled); @@ -445,17 +432,17 @@ public class TestScannerHeartbeatMessages { */ private static class HeartbeatHRegion extends HRegion { // Row sleeps occur AFTER each row worth of cells is retrieved. - private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; - private static boolean sleepBetweenRows = false; + private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; + private static volatile boolean sleepBetweenRows = false; // The sleep for column families can be initiated before or after we fetch the cells for the // column family. If the sleep occurs BEFORE then the time limits will be reached inside // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time // limit will be reached inside RegionScanner after all the cells for a column family have been // retrieved. - private static boolean sleepBeforeColumnFamily = false; - private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; - private static boolean sleepBetweenColumnFamilies = false; + private static volatile boolean sleepBeforeColumnFamily = false; + private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; + private static volatile boolean sleepBetweenColumnFamilies = false; public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) { @@ -468,20 +455,14 @@ public class TestScannerHeartbeatMessages { } private static void columnFamilySleep() { - if (HeartbeatHRegion.sleepBetweenColumnFamilies) { - try { - Thread.sleep(HeartbeatHRegion.columnFamilySleepTime); - } catch (InterruptedException e) { - } + if (sleepBetweenColumnFamilies) { + Threads.sleepWithoutInterrupt(columnFamilySleepTime); } } private static void rowSleep() { - try { - if (HeartbeatHRegion.sleepBetweenRows) { - Thread.sleep(HeartbeatHRegion.rowSleepTime); - } - } catch (InterruptedException e) { + if (sleepBetweenRows) { + Threads.sleepWithoutInterrupt(rowSleepTime); } } @@ -510,8 +491,7 @@ public class TestScannerHeartbeatMessages { } @Override - public boolean nextRaw(List outResults, ScannerContext context) - throws IOException { + public boolean nextRaw(List outResults, ScannerContext context) throws IOException { boolean moreRows = super.nextRaw(outResults, context); HeartbeatHRegion.rowSleep(); return moreRows; @@ -538,8 +518,7 @@ public class TestScannerHeartbeatMessages { } @Override - public boolean nextRaw(List outResults, ScannerContext context) - throws IOException { + public boolean nextRaw(List outResults, ScannerContext context) throws IOException { boolean moreRows = super.nextRaw(outResults, context); HeartbeatHRegion.rowSleep(); return moreRows; @@ -571,8 +550,7 @@ public class TestScannerHeartbeatMessages { } @Override - public boolean next(List result, ScannerContext context) - throws IOException { + public boolean next(List result, ScannerContext context) throws IOException { if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); boolean moreRows = super.next(result, context); if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); @@ -585,14 +563,13 @@ public class TestScannerHeartbeatMessages { * cells. */ private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { - public HeartbeatReversedKVHeap(List scanners, + public HeartbeatReversedKVHeap(List scanners, CellComparator comparator) throws IOException { super(scanners, comparator); } @Override - public boolean next(List result, ScannerContext context) - throws IOException { + public boolean next(List result, ScannerContext context) throws IOException { if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); boolean moreRows = super.next(result, context); if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();