diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 6c0c079955f..4bba248b128 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -334,78 +334,80 @@ public class TestPerColumnFamilyFlush { conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000); final int numRegionServers = 4; - TEST_UTIL.startMiniCluster(numRegionServers); - TEST_UTIL.getHBaseAdmin().createNamespace( - NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); - HTable table = TEST_UTIL.createTable(TABLENAME, families); - HTableDescriptor htd = table.getTableDescriptor(); + try { + TEST_UTIL.startMiniCluster(numRegionServers); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTableDescriptor htd = table.getTableDescriptor(); - for (byte[] family : families) { - if (!htd.hasFamily(family)) { - htd.addFamily(new HColumnDescriptor(family)); + for (byte[] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } } - } - // Add 100 edits for CF1, 20 for CF2, 20 for CF3. - // These will all be interleaved in the log. - for (int i = 1; i <= 80; i++) { - table.put(createPut(1, i)); - if (i <= 10) { - table.put(createPut(2, i)); - table.put(createPut(3, i)); + // Add 100 edits for CF1, 20 for CF2, 20 for CF3. + // These will all be interleaved in the log. + for (int i = 1; i <= 80; i++) { + table.put(createPut(1, i)); + if (i <= 10) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } } - } - table.flushCommits(); - Thread.sleep(1000); + table.flushCommits(); + Thread.sleep(1000); - Pair desiredRegionAndServer = getRegionWithName(TABLENAME); - HRegion desiredRegion = desiredRegionAndServer.getFirst(); - assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + Pair desiredRegionAndServer = getRegionWithName(TABLENAME); + HRegion desiredRegion = desiredRegionAndServer.getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); - // Flush the region selectively. - desiredRegion.flushcache(false); + // Flush the region selectively. + desiredRegion.flushcache(false); - long totalMemstoreSize; - long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; - totalMemstoreSize = desiredRegion.getMemstoreSize().get(); + long totalMemstoreSize; + long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; + totalMemstoreSize = desiredRegion.getMemstoreSize().get(); - // Find the sizes of the memstores of each CF. - cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); - cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); - cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); + // Find the sizes of the memstores of each CF. + cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); - // CF1 Should have been flushed - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); - // CF2 and CF3 shouldn't have been flushed. - assertTrue(cf2MemstoreSize > 0); - assertTrue(cf3MemstoreSize > 0); - assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize - + cf3MemstoreSize); + // CF1 Should have been flushed + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + // CF2 and CF3 shouldn't have been flushed. + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize + + cf3MemstoreSize); - // Wait for the RS report to go across to the master, so that the master - // is aware of which sequence ids have been flushed, before we kill the RS. - // If in production, the RS dies before the report goes across, we will - // safely replay all the edits. - Thread.sleep(2000); + // Wait for the RS report to go across to the master, so that the master + // is aware of which sequence ids have been flushed, before we kill the RS. + // If in production, the RS dies before the report goes across, we will + // safely replay all the edits. + Thread.sleep(2000); - // Abort the region server where we have the region hosted. - HRegionServer rs = desiredRegionAndServer.getSecond(); - rs.abort("testing"); + // Abort the region server where we have the region hosted. + HRegionServer rs = desiredRegionAndServer.getSecond(); + rs.abort("testing"); - // The aborted region server's regions will be eventually assigned to some - // other region server, and the get RPC call (inside verifyEdit()) will - // retry for some time till the regions come back up. + // The aborted region server's regions will be eventually assigned to some + // other region server, and the get RPC call (inside verifyEdit()) will + // retry for some time till the regions come back up. - // Verify that all the edits are safe. - for (int i = 1; i <= 80; i++) { - verifyEdit(1, i, table); - if (i <= 10) { - verifyEdit(2, i, table); - verifyEdit(3, i, table); + // Verify that all the edits are safe. + for (int i = 1; i <= 80; i++) { + verifyEdit(1, i, table); + if (i <= 10) { + verifyEdit(2, i, table); + verifyEdit(3, i, table); + } } + } finally { + TEST_UTIL.shutdownMiniCluster(); } - - TEST_UTIL.shutdownMiniCluster(); } // Test Log Replay with Distributed Replay on. @@ -426,6 +428,7 @@ public class TestPerColumnFamilyFlush { */ @Test public void testFlushingWhenLogRolling() throws Exception { + TableName tableName = TableName.valueOf("testFlushingWhenLogRolling"); Configuration conf = TEST_UTIL.getConfiguration(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); @@ -439,55 +442,52 @@ public class TestPerColumnFamilyFlush { int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); final int numRegionServers = 4; - TEST_UTIL.startMiniCluster(numRegionServers); - TEST_UTIL.getHBaseAdmin().createNamespace( - NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); - HTable table = TEST_UTIL.createTable(TABLENAME, families); - HTableDescriptor htd = table.getTableDescriptor(); + try { + TEST_UTIL.startMiniCluster(numRegionServers); + HTable table = TEST_UTIL.createTable(tableName, families); + HRegion desiredRegion = getRegionWithName(tableName).getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + LOG.info("Writing to region=" + desiredRegion); - for (byte[] family : families) { - if (!htd.hasFamily(family)) { - htd.addFamily(new HColumnDescriptor(family)); + // Add some edits. Most will be for CF1, some for CF2 and CF3. + for (int i = 1; i <= 10000; i++) { + table.put(createPut(1, i)); + if (i <= 200) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + table.flushCommits(); + // Keep adding until we exceed the number of log files, so that we are + // able to trigger the cleaning of old log files. + int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles(); + if (currentNumLogFiles > maxLogs) { + LOG.info("The number of log files is now: " + currentNumLogFiles + + ". Expect a log roll and memstore flush."); + break; + } } + table.close(); + // Wait for some time till the flush caused by log rolling happens. + Thread.sleep(4000); + LOG.info("Finished waiting on flush after too many WALs..."); + + // We have artificially created the conditions for a log roll. When a + // log roll happens, we should flush all the column families. Testing that + // case here. + + // Individual families should have been flushed. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, + desiredRegion.getStore(FAMILY1).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, + desiredRegion.getStore(FAMILY2).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, + desiredRegion.getStore(FAMILY3).getMemStoreSize()); + + // And of course, the total memstore should also be clean. + assertEquals(0, desiredRegion.getMemstoreSize().get()); + } finally { + TEST_UTIL.shutdownMiniCluster(); } - - HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst(); - assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); - - // Add some edits. Most will be for CF1, some for CF2 and CF3. - for (int i = 1; i <= 10000; i++) { - table.put(createPut(1, i)); - if (i <= 200) { - table.put(createPut(2, i)); - table.put(createPut(3, i)); - } - table.flushCommits(); - // Keep adding until we exceed the number of log files, so that we are - // able to trigger the cleaning of old log files. - int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles(); - if (currentNumLogFiles > maxLogs) { - LOG.info("The number of log files is now: " + currentNumLogFiles - + ". Expect a log roll and memstore flush."); - break; - } - } - table.close(); - // Wait for some time till the flush caused by log rolling happens. - Thread.sleep(4000); - - // We have artificially created the conditions for a log roll. When a - // log roll happens, we should flush all the column families. Testing that - // case here. - - // Individual families should have been flushed. - assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize()); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize()); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize()); - - // And of course, the total memstore should also be clean. - assertEquals(0, desiredRegion.getMemstoreSize().get()); - - TEST_UTIL.shutdownMiniCluster(); } private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException { @@ -535,40 +535,52 @@ public class TestPerColumnFamilyFlush { htd.addFamily(new HColumnDescriptor(FAMILY3)); LOG.info("==============Test with selective flush disabled==============="); - TEST_UTIL.startMiniCluster(1); - TEST_UTIL.getHBaseAdmin().createNamespace( - NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); - TEST_UTIL.getHBaseAdmin().createTable(htd); - TEST_UTIL.waitTableAvailable(TABLENAME); - Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(TABLENAME); - doPut(table, memstoreFlushSize); - table.close(); - conn.close(); + int cf1StoreFileCount = -1; + int cf2StoreFileCount = -1; + int cf3StoreFileCount = -1; + int cf1StoreFileCount1 = -1; + int cf2StoreFileCount1 = -1; + int cf3StoreFileCount1 = -1; + try { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(TABLENAME); + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(TABLENAME); + doPut(table, memstoreFlushSize); + table.close(); + conn.close(); - HRegion region = getRegionWithName(TABLENAME).getFirst(); - int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); - int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); - int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); - TEST_UTIL.shutdownMiniCluster(); + HRegion region = getRegionWithName(TABLENAME).getFirst(); + cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); + cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); + cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } - LOG.info("==============Test with selective flush enabled==============="); - conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); - TEST_UTIL.startMiniCluster(1); - TEST_UTIL.getHBaseAdmin().createNamespace( - NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); - TEST_UTIL.getHBaseAdmin().createTable(htd); - conn = ConnectionFactory.createConnection(conf); - table = conn.getTable(TABLENAME); - doPut(table, memstoreFlushSize); - table.close(); - conn.close(); + LOG.info("==============Test with selective flush enabled==============="); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + try { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(TABLENAME); + doPut(table, memstoreFlushSize); + table.close(); + conn.close(); - region = getRegionWithName(TABLENAME).getFirst(); - int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); - int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); - int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); - TEST_UTIL.shutdownMiniCluster(); + region = getRegionWithName(TABLENAME).getFirst(); + cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); + cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); + cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "