diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 4a9b38a161d..c131aa6bed9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -49,10 +51,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -61,7 +63,7 @@ import com.google.common.hash.Hashing; /** * This test verifies the correctness of the Per Column Family flushing strategy */ -@Category(LargeTests.class) +@Category({ RegionServerTests.class, LargeTests.class }) public class TestPerColumnFamilyFlush { private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class); @@ -119,7 +121,7 @@ public class TestPerColumnFamilyFlush { Arrays.equals(r.getFamilyMap(family).get(qf), val)); } - @Test (timeout=180000) + @Test(timeout = 180000) public void testSelectiveFlushWhenEnabled() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); @@ -258,7 +260,7 @@ public class TestPerColumnFamilyFlush { HBaseTestingUtility.closeRegionAndWAL(region); } - @Test (timeout=180000) + @Test(timeout = 180000) public void testSelectiveFlushWhenNotEnabled() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); @@ -337,7 +339,7 @@ public class TestPerColumnFamilyFlush { try { TEST_UTIL.startMiniCluster(numRegionServers); TEST_UTIL.getHBaseAdmin().createNamespace( - NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES); HTableDescriptor htd = table.getTableDescriptor(); @@ -414,46 +416,50 @@ public class TestPerColumnFamilyFlush { // In distributed log replay, the log splitters ask the master for the // last flushed sequence id for a region. This test would ensure that we // are doing the book-keeping correctly. - @Test (timeout=180000) + @Test(timeout = 180000) public void testLogReplayWithDistributedReplay() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); doTestLogReplay(); } // Test Log Replay with Distributed log split on. - @Test (timeout=180000) + @Test(timeout = 180000) public void testLogReplayWithDistributedLogSplit() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); doTestLogReplay(); } + private int getNumRolledLogFiles(HRegion region) { + return ((FSHLog) region.getWAL()).getNumRolledLogFiles(); + } + /** * When a log roll is about to happen, we do a flush of the regions who will be affected by the * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This * test ensures that we do a full-flush in that scenario. * @throws IOException */ - @Test (timeout=180000) + @Test(timeout = 180000) public void testFlushingWhenLogRolling() throws Exception { TableName tableName = TableName.valueOf("testFlushingWhenLogRolling"); Configuration conf = TEST_UTIL.getConfiguration(); - conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); - conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000); + long cfFlushSizeLowerBound = 2048; + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, + cfFlushSizeLowerBound); - // Also, let us try real hard to get a log roll to happen. - // Keeping the log roll period to 2s. - conf.setLong("hbase.regionserver.logroll.period", 2000); - // Keep the block size small so that we fill up the log files very fast. - conf.setLong("hbase.regionserver.hlog.blocksize", 6144); + // One hour, prevent periodic rolling + conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000); + // prevent rolling by size + conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024); // Make it 10 as max logs before a flush comes on. - final int walcount = 10; - conf.setInt("hbase.regionserver.maxlogs", walcount); - int maxLogs = conf.getInt("hbase.regionserver.maxlogs", walcount); + final int maxLogs = 10; + conf.setInt("hbase.regionserver.maxlogs", maxLogs); - final int numRegionServers = 4; + final int numRegionServers = 1; + TEST_UTIL.startMiniCluster(numRegionServers); try { - TEST_UTIL.startMiniCluster(numRegionServers); HTable table = null; table = TEST_UTIL.createTable(tableName, FAMILIES); // Force flush the namespace table so edits to it are not hanging around as oldest @@ -462,36 +468,56 @@ public class TestPerColumnFamilyFlush { try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { admin.flush(TableName.NAMESPACE_TABLE_NAME); } - HRegion desiredRegion = getRegionWithName(tableName).getFirst(); + Pair desiredRegionAndServer = getRegionWithName(tableName); + final HRegion desiredRegion = desiredRegionAndServer.getFirst(); assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); LOG.info("Writing to region=" + desiredRegion); - // Add some edits. Most will be for CF1, some for CF2 and CF3. - for (int i = 1; i <= 10000; i++) { - table.put(createPut(1, i)); - if (i <= 200) { - table.put(createPut(2, i)); - table.put(createPut(3, i)); + // Add one row for both CFs. + for (int i = 1; i <= 3; i++) { + table.put(createPut(i, 0)); + } + // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower + // bound and CF2 and CF3 are smaller than the lower bound. + for (int i = 0; i < maxLogs; i++) { + for (int j = 0; j < 100; j++) { + table.put(createPut(1, i * 100 + j)); } table.flushCommits(); - // Keep adding until we exceed the number of log files, so that we are - // able to trigger the cleaning of old log files. - int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles(); - if (currentNumLogFiles > maxLogs) { - LOG.info("The number of log files is now: " + currentNumLogFiles - + ". Expect a log roll and memstore flush."); - break; + // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. + int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); + assertNull(desiredRegion.getWAL().rollWriter()); + while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) { + Thread.sleep(100); } } table.close(); + assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); + assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound); + assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound); + assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize() < cfFlushSizeLowerBound); + table.put(createPut(1, 12345678)); + table.flushCommits(); + // Make numRolledLogFiles greater than maxLogs + desiredRegionAndServer.getSecond().walRoller.requestRollAll(); // Wait for some time till the flush caused by log rolling happens. - while (((FSHLog) (desiredRegion.getWAL())).getNumLogFiles() > maxLogs) Threads.sleep(100); + TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return desiredRegion.getMemstoreSize().get() == 0; + } + + @Override + public String explainFailure() throws Exception { + long memstoreSize = desiredRegion.getMemstoreSize().get(); + if (memstoreSize > 0) { + return "Still have unflushed entries in memstore, memstore size is " + memstoreSize; + } + return "Unknown"; + } + }); LOG.info("Finished waiting on flush after too many WALs..."); - - // We have artificially created the conditions for a log roll. When a - // log roll happens, we should flush all the column families. Testing that - // case here. - // Individual families should have been flushed. assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize()); @@ -499,9 +525,9 @@ public class TestPerColumnFamilyFlush { desiredRegion.getStore(FAMILY2).getMemStoreSize()); assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize()); - - // And of course, the total memstore should also be clean. - assertEquals(0, desiredRegion.getMemstoreSize().get()); + // let WAL cleanOldLogs + assertNull(desiredRegion.getWAL().rollWriter(true)); + assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); } finally { TEST_UTIL.shutdownMiniCluster(); } @@ -534,7 +560,7 @@ public class TestPerColumnFamilyFlush { // Under the same write load, small stores should have less store files when // percolumnfamilyflush enabled. - @Test (timeout=180000) + @Test(timeout = 180000) public void testCompareStoreFileCount() throws Exception { long memstoreFlushSize = 1024L * 1024; Configuration conf = TEST_UTIL.getConfiguration(); @@ -561,7 +587,7 @@ public class TestPerColumnFamilyFlush { try { TEST_UTIL.startMiniCluster(1); TEST_UTIL.getHBaseAdmin().createNamespace( - NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); TEST_UTIL.getHBaseAdmin().createTable(htd); TEST_UTIL.waitTableAvailable(TABLENAME); Connection conn = ConnectionFactory.createConnection(conf); @@ -578,12 +604,12 @@ public class TestPerColumnFamilyFlush { TEST_UTIL.shutdownMiniCluster(); } - LOG.info("==============Test with selective flush enabled==============="); - conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + LOG.info("==============Test with selective flush enabled==============="); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); try { TEST_UTIL.startMiniCluster(1); TEST_UTIL.getHBaseAdmin().createNamespace( - NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); TEST_UTIL.getHBaseAdmin().createTable(htd); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TABLENAME);