From a1f8821b85b92c0df63f10a3fee89a1c67a0ea1c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 14 Dec 2017 15:59:41 +0800 Subject: [PATCH] HBASE-19510 TestDistributedLogSplitting is flakey for AsyncFSWAL --- ...LogSplitting.java => AbstractTestDLS.java} | 333 ++++++++---------- .../hbase/master/TestDLSAsyncFSWAL.java | 31 ++ .../hadoop/hbase/master/TestDLSFSHLog.java | 31 ++ 3 files changed, 208 insertions(+), 187 deletions(-) rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/{TestDistributedLogSplitting.java => AbstractTestDLS.java} (74%) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSAsyncFSWAL.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSFSHLog.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java similarity index 74% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index ea184afef5d..d74815c482d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -51,11 +51,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -76,21 +73,16 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter; -import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; @@ -99,111 +91,94 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -@Category({MasterTests.class, LargeTests.class}) -@SuppressWarnings("deprecation") -public class TestDistributedLogSplitting { +/** + * Base class for testing distributed log splitting. + */ +public abstract class AbstractTestDLS { private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); - static { - // Uncomment the following line if more verbosity is needed for - // debugging (see HBASE-12285 for details). - //Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); - // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this - // turns it off for this test. TODO: Figure out why scr breaks recovery. - System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - } + // Start a cluster with 2 masters and 5 regionservers + private static final int NUM_MASTERS = 2; + private static final int NUM_RS = 5; + private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); @Rule public TestName testName = new TestName(); - TableName tableName; - // Start a cluster with 2 masters and 6 regionservers - static final int NUM_MASTERS = 2; - static final int NUM_RS = 5; - static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); - - MiniHBaseCluster cluster; - HMaster master; - Configuration conf; - static Configuration originalConf; - static HBaseTestingUtility TEST_UTIL; - static MiniZooKeeperCluster zkCluster; + private TableName tableName; + private MiniHBaseCluster cluster; + private HMaster master; + private Configuration conf; @Rule public TestName name = new TestName(); @BeforeClass public static void setup() throws Exception { - TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create()); - zkCluster = TEST_UTIL.startMiniZKCluster(); - originalConf = TEST_UTIL.getConfiguration(); + // Uncomment the following line if more verbosity is needed for + // debugging (see HBASE-12285 for details). + // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); + TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniDFSCluster(3); } @AfterClass - public static void tearDown() throws IOException { - TEST_UTIL.shutdownMiniZKCluster(); - TEST_UTIL.shutdownMiniDFSCluster(); - TEST_UTIL.shutdownMiniHBaseCluster(); + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); } - private void startCluster(int num_rs) throws Exception { + protected abstract String getWalProvider(); + + private void startCluster(int numRS) throws Exception { SplitLogCounters.resetCounters(); LOG.info("Starting cluster"); - conf.getLong("hbase.splitlog.max.resubmit", 0); + conf.setLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing conf.setInt("hbase.regionserver.wal.max.splitters", 3); conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); - TEST_UTIL.shutdownMiniHBaseCluster(); - TEST_UTIL = new HBaseTestingUtility(conf); - TEST_UTIL.setZkCluster(zkCluster); - TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs); + conf.set("hbase.wal.provider", getWalProvider()); + TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, numRS); cluster = TEST_UTIL.getHBaseCluster(); LOG.info("Waiting for active/ready master"); cluster.waitForActiveAndReadyMaster(); master = cluster.getMaster(); - while (cluster.getLiveRegionServerThreads().size() < num_rs) { - Threads.sleep(10); - } + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return cluster.getLiveRegionServerThreads().size() >= numRS; + } + }); } @Before public void before() throws Exception { - // refresh configuration - conf = HBaseConfiguration.create(originalConf); + conf = TEST_UTIL.getConfiguration(); tableName = TableName.valueOf(testName.getMethodName()); } @After public void after() throws Exception { - try { - if (TEST_UTIL.getHBaseCluster() != null) { - for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) { - mt.getMaster().abort("closing...", null); - } - } - TEST_UTIL.shutdownMiniHBaseCluster(); - } finally { - TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); - ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); - } + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); + ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); } - @Test (timeout=300000) + @Test(timeout = 300000) public void testRecoveredEdits() throws Exception { conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal startCluster(NUM_RS); - final int NUM_LOG_LINES = 10000; - final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); + int numLogLines = 10000; + SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false); @@ -214,8 +189,8 @@ public class TestDistributedLogSplitting { Path rootdir = FSUtils.getRootDir(conf); int numRegions = 50; - Table t = installTable(new ZKWatcher(conf, "table-creation", null), numRegions); - try { + try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); + Table t = installTable(zkw, numRegions)) { TableName table = t.getName(); List regions = null; HRegionServer hrs = null; @@ -223,10 +198,12 @@ public class TestDistributedLogSplitting { hrs = rsts.get(i).getRegionServer(); regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); // At least one RS will have >= to average number of regions. - if (regions.size() >= numRegions/NUM_RS) break; + if (regions.size() >= numRegions / NUM_RS) { + break; + } } - final Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(hrs - .getServerName().toString())); + Path logDir = new Path(rootdir, + AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); LOG.info("#regions = " + regions.size()); Iterator it = regions.iterator(); @@ -238,15 +215,16 @@ public class TestDistributedLogSplitting { } } - makeWAL(hrs, regions, NUM_LOG_LINES, 100); + makeWAL(hrs, regions, numLogLines, 100); slm.splitLogDistributed(logDir); int count = 0; for (RegionInfo hri : regions) { Path tdir = FSUtils.getTableDir(rootdir, table); - Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir( - HRegion.getRegionDir(tdir, hri.getEncodedName())); + @SuppressWarnings("deprecation") + Path editsdir = WALSplitter + .getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override @@ -258,8 +236,8 @@ public class TestDistributedLogSplitting { } }); assertTrue( - "edits dir should have more than a single file in it. instead has " + files.length, - files.length > 1); + "edits dir should have more than a single file in it. instead has " + files.length, + files.length > 1); for (int i = 0; i < files.length; i++) { int c = countWAL(files[i].getPath(), fs, conf); count += c; @@ -269,9 +247,7 @@ public class TestDistributedLogSplitting { // check that the log file is moved assertFalse(fs.exists(logDir)); - assertEquals(NUM_LOG_LINES, count); - } finally { - if (t != null) t.close(); + assertEquals(numLogLines, count); } } @@ -280,18 +256,17 @@ public class TestDistributedLogSplitting { conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); startCluster(NUM_RS); - final int NUM_REGIONS_TO_CREATE = 40; - final int NUM_LOG_LINES = 1000; + int numRegionsToCreate = 40; + int numLogLines = 1000; // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false); - final ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); - Table ht = installTable(zkw, NUM_REGIONS_TO_CREATE); - try { + try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, numRegionsToCreate);) { HRegionServer hrs = findRSToKill(false); List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); - makeWAL(hrs, regions, NUM_LOG_LINES, 100); + makeWAL(hrs, regions, numLogLines, 100); // abort master abortMaster(cluster); @@ -309,71 +284,68 @@ public class TestDistributedLogSplitting { }); Thread.sleep(2000); - LOG.info("Current Open Regions:" - + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); + LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); // wait for abort completes TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { - return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() - >= (NUM_REGIONS_TO_CREATE + 1)); + return (HBaseTestingUtility.getAllOnlineRegions(cluster) + .size() >= (numRegionsToCreate + 1)); } }); - LOG.info("Current Open Regions After Master Node Starts Up:" - + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); + LOG.info("Current Open Regions After Master Node Starts Up:" + + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); - assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); - } finally { - if (ht != null) ht.close(); - if (zkw != null) zkw.close(); + assertEquals(numLogLines, TEST_UTIL.countRows(ht)); } } /** - * The original intention of this test was to force an abort of a region - * server and to make sure that the failure path in the region servers is - * properly evaluated. But it is difficult to ensure that the region server - * doesn't finish the log splitting before it aborts. Also now, there is - * this code path where the master will preempt the region server when master - * detects that the region server has aborted. + * The original intention of this test was to force an abort of a region server and to make sure + * that the failure path in the region servers is properly evaluated. But it is difficult to + * ensure that the region server doesn't finish the log splitting before it aborts. Also now, + * there is this code path where the master will preempt the region server when master detects + * that the region server has aborted. * @throws Exception */ // Was marked flaky before Distributed Log Replay cleanup. - @Test (timeout=300000) + @Test(timeout = 300000) public void testWorkerAbort() throws Exception { LOG.info("testWorkerAbort"); startCluster(3); - final int NUM_LOG_LINES = 10000; - final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); + int numLogLines = 10000; + SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); FileSystem fs = master.getMasterFileSystem().getFileSystem(); - final List rsts = cluster.getLiveRegionServerThreads(); + List rsts = cluster.getLiveRegionServerThreads(); HRegionServer hrs = findRSToKill(false); Path rootdir = FSUtils.getRootDir(conf); final Path logDir = new Path(rootdir, - AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); + AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); - Table t = installTable(new ZKWatcher(conf, "table-creation", null), 40); - try { - makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), NUM_LOG_LINES, 100); + try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); + Table t = installTable(zkw, 40)) { + makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); new Thread() { @Override public void run() { - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + try { + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + } catch (InterruptedException e) { + } for (RegionServerThread rst : rsts) { rst.getRegionServer().abort("testing"); break; } } }.start(); - // slm.splitLogDistributed(logDir); FileStatus[] logfiles = fs.listStatus(logDir); TaskBatch batch = new TaskBatch(); slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); - //waitForCounter but for one of the 2 counters + // waitForCounter but for one of the 2 counters long curt = System.currentTimeMillis(); long waitTime = 80000; long endt = curt + waitTime; @@ -381,7 +353,7 @@ public class TestDistributedLogSplitting { if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + tot_wkr_preempt_task.sum()) == 0) { - Thread.yield(); + Thread.sleep(100); curt = System.currentTimeMillis(); } else { assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + @@ -390,29 +362,23 @@ public class TestDistributedLogSplitting { return; } } - fail("none of the following counters went up in " + waitTime + - " milliseconds - " + + fail("none of the following counters went up in " + waitTime + " milliseconds - " + "tot_wkr_task_resigned, tot_wkr_task_err, " + - "tot_wkr_final_transition_failed, tot_wkr_task_done, " + - "tot_wkr_preempt_task"); - } finally { - if (t != null) t.close(); + "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task"); } } - @Test (timeout=300000) + @Test(timeout = 300000) public void testThreeRSAbort() throws Exception { LOG.info("testThreeRSAbort"); - final int NUM_REGIONS_TO_CREATE = 40; - final int NUM_ROWS_PER_REGION = 100; + int numRegionsToCreate = 40; + int numRowsPerRegion = 100; startCluster(NUM_RS); // NUM_RS=6. - final ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); - - Table table = installTable(zkw, NUM_REGIONS_TO_CREATE); - try { - populateDataInTable(NUM_ROWS_PER_REGION); + try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); + Table table = installTable(zkw, numRegionsToCreate)) { + populateDataInTable(numRowsPerRegion); List rsts = cluster.getLiveRegionServerThreads(); assertEquals(NUM_RS, rsts.size()); @@ -420,22 +386,24 @@ public class TestDistributedLogSplitting { cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); - long start = EnvironmentEdgeManager.currentTime(); - while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) { - if (EnvironmentEdgeManager.currentTime() - start > 60000) { - fail("Timed out waiting for server aborts."); + TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; } - Thread.sleep(200); - } + + @Override + public String explainFailure() throws Exception { + return "Timed out waiting for server aborts."; + } + }); TEST_UTIL.waitUntilAllRegionsAssigned(tableName); - assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION, TEST_UTIL.countRows(table)); - } finally { - if (table != null) table.close(); - if (zkw != null) zkw.close(); + assertEquals(numRegionsToCreate * numRowsPerRegion, TEST_UTIL.countRows(table)); } } - @Test(timeout=30000) + @Test(timeout = 30000) public void testDelayedDeleteOnFailure() throws Exception { LOG.info("testDelayedDeleteOnFailure"); startCluster(1); @@ -511,12 +479,13 @@ public class TestDistributedLogSplitting { Table ht = installTable(zkw, 10); try { FileSystem fs = master.getMasterFileSystem().getFileSystem(); - Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName())); + Path tableDir = + FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName())); List regionDirs = FSUtils.getRegionDirs(fs, tableDir); long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); - WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); + WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); assertEquals(newSeqId + 2000, - WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); + WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @@ -538,13 +507,13 @@ public class TestDistributedLogSplitting { } } - Table installTable(ZKWatcher zkw, int nrs) throws Exception { + private Table installTable(ZKWatcher zkw, int nrs) throws Exception { return installTable(zkw, nrs, 0); } - Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception { + private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception { // Create a table with regions - byte [] family = Bytes.toBytes("family"); + byte[] family = Bytes.toBytes("family"); LOG.info("Creating table with " + nrs + " regions"); Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); int numRegions = -1; @@ -588,8 +557,8 @@ public class TestDistributedLogSplitting { if (hri.getTable().isSystemTable()) { continue; } - LOG.debug("adding data to rs = " + rst.getName() + - " region = "+ hri.getRegionNameAsString()); + LOG.debug( + "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); Region region = hrs.getOnlineRegion(hri.getRegionName()); assertTrue(region != null); putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); @@ -609,8 +578,8 @@ public class TestDistributedLogSplitting { if (hri.getTable().isSystemTable()) { continue; } - LOG.debug("adding data to rs = " + mt.getName() + - " region = "+ hri.getRegionNameAsString()); + LOG.debug( + "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); Region region = hrs.getOnlineRegion(hri.getRegionName()); assertTrue(region != null); putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); @@ -623,20 +592,18 @@ public class TestDistributedLogSplitting { makeWAL(hrs, regions, num_edits, edit_size, true); } - public void makeWAL(HRegionServer hrs, List regions, - int num_edits, int edit_size, boolean cleanShutdown) throws IOException { + public void makeWAL(HRegionServer hrs, List regions, int numEdits, int editSize, + boolean cleanShutdown) throws IOException { // remove root and meta region regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); - for(Iterator iter = regions.iterator(); iter.hasNext(); ) { + for (Iterator iter = regions.iterator(); iter.hasNext();) { RegionInfo regionInfo = iter.next(); - if(regionInfo.getTable().isSystemTable()) { + if (regionInfo.getTable().isSystemTable()) { iter.remove(); } } - HTableDescriptor htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY)); - byte[] value = new byte[edit_size]; + byte[] value = new byte[editSize]; List hris = new ArrayList<>(); for (RegionInfo region : regions) { @@ -646,19 +613,19 @@ public class TestDistributedLogSplitting { hris.add(region); } LOG.info("Creating wal edits across " + hris.size() + " regions."); - for (int i = 0; i < edit_size; i++) { + for (int i = 0; i < editSize; i++) { value[i] = (byte) ('a' + (i % 26)); } int n = hris.size(); int[] counts = new int[n]; // sync every ~30k to line up with desired wal rolls - final int syncEvery = 30 * 1024 / edit_size; + final int syncEvery = 30 * 1024 / editSize; MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); if (n > 0) { - for (int i = 0; i < num_edits; i += 1) { + for (int i = 0; i < numEdits; i += 1) { WALEdit e = new WALEdit(); RegionInfo curRegionInfo = hris.get(i % n); - final WAL log = hrs.getWAL(curRegionInfo); + WAL log = hrs.getWAL(curRegionInfo); byte[] startRow = curRegionInfo.getStartKey(); if (startRow == null || startRow.length == 0) { startRow = new byte[] { 0, 0, 0, 0, 1 }; @@ -668,9 +635,9 @@ public class TestDistributedLogSplitting { // HBaseTestingUtility.createMultiRegions use 5 bytes key byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); - log.append(curRegionInfo, - new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), e, true); + log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc), + e, true); if (0 == i % syncEvery) { log.sync(); } @@ -680,12 +647,12 @@ public class TestDistributedLogSplitting { // done as two passes because the regions might share logs. shutdown is idempotent, but sync // will cause errors if done after. for (RegionInfo info : hris) { - final WAL log = hrs.getWAL(info); + WAL log = hrs.getWAL(info); log.sync(); } if (cleanShutdown) { for (RegionInfo info : hris) { - final WAL log = hrs.getWAL(info); + WAL log = hrs.getWAL(info); log.shutdown(); } } @@ -695,24 +662,15 @@ public class TestDistributedLogSplitting { return; } - private int countWAL(Path log, FileSystem fs, Configuration conf) - throws IOException { + private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { int count = 0; - WAL.Reader in = WALFactory.createReader(fs, log, conf); - try { + try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { WAL.Entry e; while ((e = in.next()) != null) { if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { count++; } } - } finally { - try { - in.close(); - } catch (IOException exception) { - LOG.warn("Problem closing wal: " + exception.getMessage()); - LOG.debug("exception details.", exception); - } } return count; } @@ -721,32 +679,31 @@ public class TestDistributedLogSplitting { TEST_UTIL.waitUntilNoRegionsInTransition(60000); } - private void putData(Region region, byte[] startRow, int numRows, byte [] qf, - byte [] ...families) + private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) throws IOException { - for(int i = 0; i < numRows; i++) { + for (int i = 0; i < numRows; i++) { Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); - for(byte [] family : families) { + for (byte[] family : families) { put.addColumn(family, qf, null); } region.put(put); } } - private void waitForCounter(LongAdder ctr, long oldval, long newval, - long timems) { + private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) + throws InterruptedException { long curt = System.currentTimeMillis(); long endt = curt + timems; while (curt < endt) { if (ctr.sum() == oldval) { - Thread.yield(); + Thread.sleep(100); curt = System.currentTimeMillis(); } else { assertEquals(newval, ctr.sum()); return; } } - assertTrue(false); + fail(); } private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { @@ -769,7 +726,7 @@ public class TestDistributedLogSplitting { List regions = null; HRegionServer hrs = null; - for (RegionServerThread rst: rsts) { + for (RegionServerThread rst : rsts) { hrs = rst.getRegionServer(); while (rst.isAlive() && !hrs.isOnline()) { Thread.sleep(100); @@ -794,14 +751,14 @@ public class TestDistributedLogSplitting { if (isCarryingMeta && hasMetaRegion) { // clients ask for a RS with META if (!foundTableRegion) { - final HRegionServer destRS = hrs; + HRegionServer destRS = hrs; // the RS doesn't have regions of the specified table so we need move one to this RS List tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); - final RegionInfo hri = tableRegions.get(0); + RegionInfo hri = tableRegions.get(0); TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), - Bytes.toBytes(destRS.getServerName().getServerName())); + Bytes.toBytes(destRS.getServerName().getServerName())); // wait for region move completes - final RegionStates regionStates = + RegionStates regionStates = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate() { @Override @@ -815,7 +772,9 @@ public class TestDistributedLogSplitting { } else if (hasMetaRegion || isCarryingMeta) { continue; } - if (foundTableRegion) break; + if (foundTableRegion) { + break; + } } return hrs; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSAsyncFSWAL.java new file mode 100644 index 00000000000..36e74564333 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSAsyncFSWAL.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestDLSAsyncFSWAL extends AbstractTestDLS { + + @Override + protected String getWalProvider() { + return "asyncfs"; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSFSHLog.java new file mode 100644 index 00000000000..e83834f079a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDLSFSHLog.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestDLSFSHLog extends AbstractTestDLS { + + @Override + protected String getWalProvider() { + return "filesystem"; + } +}