HBASE-12772 TestPerColumnFamilyFlush failing

This commit is contained in:
stack 2014-12-29 15:45:23 -08:00
parent 7527227ab9
commit 8ff62d9cee
1 changed files with 146 additions and 134 deletions

View File

@ -334,78 +334,80 @@ public class TestPerColumnFamilyFlush {
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000);
final int numRegionServers = 4; final int numRegionServers = 4;
TEST_UTIL.startMiniCluster(numRegionServers); try {
TEST_UTIL.getHBaseAdmin().createNamespace( TEST_UTIL.startMiniCluster(numRegionServers);
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); TEST_UTIL.getHBaseAdmin().createNamespace(
HTable table = TEST_UTIL.createTable(TABLENAME, families); NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
HTableDescriptor htd = table.getTableDescriptor(); HTable table = TEST_UTIL.createTable(TABLENAME, families);
HTableDescriptor htd = table.getTableDescriptor();
for (byte[] family : families) { for (byte[] family : families) {
if (!htd.hasFamily(family)) { if (!htd.hasFamily(family)) {
htd.addFamily(new HColumnDescriptor(family)); htd.addFamily(new HColumnDescriptor(family));
}
} }
}
// Add 100 edits for CF1, 20 for CF2, 20 for CF3. // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
// These will all be interleaved in the log. // These will all be interleaved in the log.
for (int i = 1; i <= 80; i++) { for (int i = 1; i <= 80; i++) {
table.put(createPut(1, i)); table.put(createPut(1, i));
if (i <= 10) { if (i <= 10) {
table.put(createPut(2, i)); table.put(createPut(2, i));
table.put(createPut(3, i)); table.put(createPut(3, i));
}
} }
} table.flushCommits();
table.flushCommits(); Thread.sleep(1000);
Thread.sleep(1000);
Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME); Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
HRegion desiredRegion = desiredRegionAndServer.getFirst(); HRegion desiredRegion = desiredRegionAndServer.getFirst();
assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
// Flush the region selectively. // Flush the region selectively.
desiredRegion.flushcache(false); desiredRegion.flushcache(false);
long totalMemstoreSize; long totalMemstoreSize;
long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
totalMemstoreSize = desiredRegion.getMemstoreSize().get(); totalMemstoreSize = desiredRegion.getMemstoreSize().get();
// Find the sizes of the memstores of each CF. // Find the sizes of the memstores of each CF.
cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
// CF1 Should have been flushed // CF1 Should have been flushed
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
// CF2 and CF3 shouldn't have been flushed. // CF2 and CF3 shouldn't have been flushed.
assertTrue(cf2MemstoreSize > 0); assertTrue(cf2MemstoreSize > 0);
assertTrue(cf3MemstoreSize > 0); assertTrue(cf3MemstoreSize > 0);
assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
+ cf3MemstoreSize); + cf3MemstoreSize);
// Wait for the RS report to go across to the master, so that the master // Wait for the RS report to go across to the master, so that the master
// is aware of which sequence ids have been flushed, before we kill the RS. // is aware of which sequence ids have been flushed, before we kill the RS.
// If in production, the RS dies before the report goes across, we will // If in production, the RS dies before the report goes across, we will
// safely replay all the edits. // safely replay all the edits.
Thread.sleep(2000); Thread.sleep(2000);
// Abort the region server where we have the region hosted. // Abort the region server where we have the region hosted.
HRegionServer rs = desiredRegionAndServer.getSecond(); HRegionServer rs = desiredRegionAndServer.getSecond();
rs.abort("testing"); rs.abort("testing");
// The aborted region server's regions will be eventually assigned to some // The aborted region server's regions will be eventually assigned to some
// other region server, and the get RPC call (inside verifyEdit()) will // other region server, and the get RPC call (inside verifyEdit()) will
// retry for some time till the regions come back up. // retry for some time till the regions come back up.
// Verify that all the edits are safe. // Verify that all the edits are safe.
for (int i = 1; i <= 80; i++) { for (int i = 1; i <= 80; i++) {
verifyEdit(1, i, table); verifyEdit(1, i, table);
if (i <= 10) { if (i <= 10) {
verifyEdit(2, i, table); verifyEdit(2, i, table);
verifyEdit(3, i, table); verifyEdit(3, i, table);
}
} }
} finally {
TEST_UTIL.shutdownMiniCluster();
} }
TEST_UTIL.shutdownMiniCluster();
} }
// Test Log Replay with Distributed Replay on. // Test Log Replay with Distributed Replay on.
@ -426,6 +428,7 @@ public class TestPerColumnFamilyFlush {
*/ */
@Test @Test
public void testFlushingWhenLogRolling() throws Exception { public void testFlushingWhenLogRolling() throws Exception {
TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
@ -439,55 +442,52 @@ public class TestPerColumnFamilyFlush {
int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
final int numRegionServers = 4; final int numRegionServers = 4;
TEST_UTIL.startMiniCluster(numRegionServers); try {
TEST_UTIL.getHBaseAdmin().createNamespace( TEST_UTIL.startMiniCluster(numRegionServers);
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); HTable table = TEST_UTIL.createTable(tableName, families);
HTable table = TEST_UTIL.createTable(TABLENAME, families); HRegion desiredRegion = getRegionWithName(tableName).getFirst();
HTableDescriptor htd = table.getTableDescriptor(); assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
LOG.info("Writing to region=" + desiredRegion);
for (byte[] family : families) { // Add some edits. Most will be for CF1, some for CF2 and CF3.
if (!htd.hasFamily(family)) { for (int i = 1; i <= 10000; i++) {
htd.addFamily(new HColumnDescriptor(family)); table.put(createPut(1, i));
if (i <= 200) {
table.put(createPut(2, i));
table.put(createPut(3, i));
}
table.flushCommits();
// Keep adding until we exceed the number of log files, so that we are
// able to trigger the cleaning of old log files.
int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles();
if (currentNumLogFiles > maxLogs) {
LOG.info("The number of log files is now: " + currentNumLogFiles
+ ". Expect a log roll and memstore flush.");
break;
}
} }
table.close();
// Wait for some time till the flush caused by log rolling happens.
Thread.sleep(4000);
LOG.info("Finished waiting on flush after too many WALs...");
// We have artificially created the conditions for a log roll. When a
// log roll happens, we should flush all the column families. Testing that
// case here.
// Individual families should have been flushed.
assertEquals(DefaultMemStore.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY1).getMemStoreSize());
assertEquals(DefaultMemStore.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY2).getMemStoreSize());
assertEquals(DefaultMemStore.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY3).getMemStoreSize());
// And of course, the total memstore should also be clean.
assertEquals(0, desiredRegion.getMemstoreSize().get());
} finally {
TEST_UTIL.shutdownMiniCluster();
} }
HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst();
assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
// Add some edits. Most will be for CF1, some for CF2 and CF3.
for (int i = 1; i <= 10000; i++) {
table.put(createPut(1, i));
if (i <= 200) {
table.put(createPut(2, i));
table.put(createPut(3, i));
}
table.flushCommits();
// Keep adding until we exceed the number of log files, so that we are
// able to trigger the cleaning of old log files.
int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles();
if (currentNumLogFiles > maxLogs) {
LOG.info("The number of log files is now: " + currentNumLogFiles
+ ". Expect a log roll and memstore flush.");
break;
}
}
table.close();
// Wait for some time till the flush caused by log rolling happens.
Thread.sleep(4000);
// We have artificially created the conditions for a log roll. When a
// log roll happens, we should flush all the column families. Testing that
// case here.
// Individual families should have been flushed.
assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize());
assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize());
assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize());
// And of course, the total memstore should also be clean.
assertEquals(0, desiredRegion.getMemstoreSize().get());
TEST_UTIL.shutdownMiniCluster();
} }
private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException { private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
@ -535,40 +535,52 @@ public class TestPerColumnFamilyFlush {
htd.addFamily(new HColumnDescriptor(FAMILY3)); htd.addFamily(new HColumnDescriptor(FAMILY3));
LOG.info("==============Test with selective flush disabled==============="); LOG.info("==============Test with selective flush disabled===============");
TEST_UTIL.startMiniCluster(1); int cf1StoreFileCount = -1;
TEST_UTIL.getHBaseAdmin().createNamespace( int cf2StoreFileCount = -1;
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); int cf3StoreFileCount = -1;
TEST_UTIL.getHBaseAdmin().createTable(htd); int cf1StoreFileCount1 = -1;
TEST_UTIL.waitTableAvailable(TABLENAME); int cf2StoreFileCount1 = -1;
Connection conn = ConnectionFactory.createConnection(conf); int cf3StoreFileCount1 = -1;
Table table = conn.getTable(TABLENAME); try {
doPut(table, memstoreFlushSize); TEST_UTIL.startMiniCluster(1);
table.close(); TEST_UTIL.getHBaseAdmin().createNamespace(
conn.close(); NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
TEST_UTIL.getHBaseAdmin().createTable(htd);
TEST_UTIL.waitTableAvailable(TABLENAME);
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TABLENAME);
doPut(table, memstoreFlushSize);
table.close();
conn.close();
HRegion region = getRegionWithName(TABLENAME).getFirst(); HRegion region = getRegionWithName(TABLENAME).getFirst();
int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
TEST_UTIL.shutdownMiniCluster(); } finally {
TEST_UTIL.shutdownMiniCluster();
}
LOG.info("==============Test with selective flush enabled==============="); LOG.info("==============Test with selective flush enabled===============");
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
TEST_UTIL.startMiniCluster(1); try {
TEST_UTIL.getHBaseAdmin().createNamespace( TEST_UTIL.startMiniCluster(1);
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); TEST_UTIL.getHBaseAdmin().createNamespace(
TEST_UTIL.getHBaseAdmin().createTable(htd); NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
conn = ConnectionFactory.createConnection(conf); TEST_UTIL.getHBaseAdmin().createTable(htd);
table = conn.getTable(TABLENAME); Connection conn = ConnectionFactory.createConnection(conf);
doPut(table, memstoreFlushSize); Table table = conn.getTable(TABLENAME);
table.close(); doPut(table, memstoreFlushSize);
conn.close(); table.close();
conn.close();
region = getRegionWithName(TABLENAME).getFirst(); region = getRegionWithName(TABLENAME).getFirst();
int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
TEST_UTIL.shutdownMiniCluster(); } finally {
TEST_UTIL.shutdownMiniCluster();
}
LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
+ ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "