HBASE-13242 TestPerColumnFamilyFlush.testFlushingWhenLogRolling hung
This commit is contained in:
parent
c3b47f2251
commit
cb4db89bff
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
@ -52,7 +54,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -121,7 +122,7 @@ public class TestPerColumnFamilyFlush {
|
||||||
Arrays.equals(r.getFamilyMap(family).get(qf), val));
|
Arrays.equals(r.getFamilyMap(family).get(qf), val));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=180000)
|
@Test(timeout = 180000)
|
||||||
public void testSelectiveFlushWhenEnabled() throws IOException {
|
public void testSelectiveFlushWhenEnabled() throws IOException {
|
||||||
// Set up the configuration
|
// Set up the configuration
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
@ -259,7 +260,7 @@ public class TestPerColumnFamilyFlush {
|
||||||
assertEquals(0, region.getMemstoreSize().get());
|
assertEquals(0, region.getMemstoreSize().get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=180000)
|
@Test(timeout = 180000)
|
||||||
public void testSelectiveFlushWhenNotEnabled() throws IOException {
|
public void testSelectiveFlushWhenNotEnabled() throws IOException {
|
||||||
// Set up the configuration
|
// Set up the configuration
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
@ -337,7 +338,7 @@ public class TestPerColumnFamilyFlush {
|
||||||
try {
|
try {
|
||||||
TEST_UTIL.startMiniCluster(numRegionServers);
|
TEST_UTIL.startMiniCluster(numRegionServers);
|
||||||
TEST_UTIL.getHBaseAdmin().createNamespace(
|
TEST_UTIL.getHBaseAdmin().createNamespace(
|
||||||
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
|
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
|
||||||
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
|
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
|
||||||
HTableDescriptor htd = table.getTableDescriptor();
|
HTableDescriptor htd = table.getTableDescriptor();
|
||||||
|
|
||||||
|
@ -414,46 +415,50 @@ public class TestPerColumnFamilyFlush {
|
||||||
// In distributed log replay, the log splitters ask the master for the
|
// In distributed log replay, the log splitters ask the master for the
|
||||||
// last flushed sequence id for a region. This test would ensure that we
|
// last flushed sequence id for a region. This test would ensure that we
|
||||||
// are doing the book-keeping correctly.
|
// are doing the book-keeping correctly.
|
||||||
@Test (timeout=180000)
|
@Test(timeout = 180000)
|
||||||
public void testLogReplayWithDistributedReplay() throws Exception {
|
public void testLogReplayWithDistributedReplay() throws Exception {
|
||||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||||
doTestLogReplay();
|
doTestLogReplay();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test Log Replay with Distributed log split on.
|
// Test Log Replay with Distributed log split on.
|
||||||
@Test (timeout=180000)
|
@Test(timeout = 180000)
|
||||||
public void testLogReplayWithDistributedLogSplit() throws Exception {
|
public void testLogReplayWithDistributedLogSplit() throws Exception {
|
||||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
|
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
|
||||||
doTestLogReplay();
|
doTestLogReplay();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getNumRolledLogFiles(HRegion region) {
|
||||||
|
return ((FSHLog) region.getWAL()).getNumRolledLogFiles();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When a log roll is about to happen, we do a flush of the regions who will be affected by the
|
* When a log roll is about to happen, we do a flush of the regions who will be affected by the
|
||||||
* log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
|
* log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
|
||||||
* test ensures that we do a full-flush in that scenario.
|
* test ensures that we do a full-flush in that scenario.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test (timeout=180000)
|
@Test(timeout = 180000)
|
||||||
public void testFlushingWhenLogRolling() throws Exception {
|
public void testFlushingWhenLogRolling() throws Exception {
|
||||||
TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
|
TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000);
|
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
|
||||||
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
|
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
|
||||||
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000);
|
long cfFlushSizeLowerBound = 2048;
|
||||||
|
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
|
||||||
|
cfFlushSizeLowerBound);
|
||||||
|
|
||||||
// Also, let us try real hard to get a log roll to happen.
|
// One hour, prevent periodic rolling
|
||||||
// Keeping the log roll period to 2s.
|
conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
|
||||||
conf.setLong("hbase.regionserver.logroll.period", 2000);
|
// prevent rolling by size
|
||||||
// Keep the block size small so that we fill up the log files very fast.
|
conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
|
||||||
conf.setLong("hbase.regionserver.hlog.blocksize", 6144);
|
|
||||||
// Make it 10 as max logs before a flush comes on.
|
// Make it 10 as max logs before a flush comes on.
|
||||||
final int walcount = 10;
|
final int maxLogs = 10;
|
||||||
conf.setInt("hbase.regionserver.maxlogs", walcount);
|
conf.setInt("hbase.regionserver.maxlogs", maxLogs);
|
||||||
int maxLogs = conf.getInt("hbase.regionserver.maxlogs", walcount);
|
|
||||||
|
|
||||||
final int numRegionServers = 4;
|
final int numRegionServers = 1;
|
||||||
|
TEST_UTIL.startMiniCluster(numRegionServers);
|
||||||
try {
|
try {
|
||||||
TEST_UTIL.startMiniCluster(numRegionServers);
|
|
||||||
HTable table = null;
|
HTable table = null;
|
||||||
table = TEST_UTIL.createTable(tableName, FAMILIES);
|
table = TEST_UTIL.createTable(tableName, FAMILIES);
|
||||||
// Force flush the namespace table so edits to it are not hanging around as oldest
|
// Force flush the namespace table so edits to it are not hanging around as oldest
|
||||||
|
@ -462,36 +467,56 @@ public class TestPerColumnFamilyFlush {
|
||||||
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
|
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
|
||||||
admin.flush(TableName.NAMESPACE_TABLE_NAME);
|
admin.flush(TableName.NAMESPACE_TABLE_NAME);
|
||||||
}
|
}
|
||||||
HRegion desiredRegion = getRegionWithName(tableName).getFirst();
|
Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
|
||||||
|
final HRegion desiredRegion = desiredRegionAndServer.getFirst();
|
||||||
assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
|
assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
|
||||||
LOG.info("Writing to region=" + desiredRegion);
|
LOG.info("Writing to region=" + desiredRegion);
|
||||||
|
|
||||||
// Add some edits. Most will be for CF1, some for CF2 and CF3.
|
// Add one row for both CFs.
|
||||||
for (int i = 1; i <= 10000; i++) {
|
for (int i = 1; i <= 3; i++) {
|
||||||
table.put(createPut(1, i));
|
table.put(createPut(i, 0));
|
||||||
if (i <= 200) {
|
}
|
||||||
table.put(createPut(2, i));
|
// Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
|
||||||
table.put(createPut(3, i));
|
// bound and CF2 and CF3 are smaller than the lower bound.
|
||||||
|
for (int i = 0; i < maxLogs; i++) {
|
||||||
|
for (int j = 0; j < 100; j++) {
|
||||||
|
table.put(createPut(1, i * 100 + j));
|
||||||
}
|
}
|
||||||
table.flushCommits();
|
table.flushCommits();
|
||||||
// Keep adding until we exceed the number of log files, so that we are
|
// Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
|
||||||
// able to trigger the cleaning of old log files.
|
int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
|
||||||
int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles();
|
assertNull(desiredRegion.getWAL().rollWriter());
|
||||||
if (currentNumLogFiles > maxLogs) {
|
while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
|
||||||
LOG.info("The number of log files is now: " + currentNumLogFiles
|
Thread.sleep(100);
|
||||||
+ ". Expect a log roll and memstore flush.");
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
table.close();
|
table.close();
|
||||||
|
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
|
||||||
|
assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound);
|
||||||
|
assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound);
|
||||||
|
assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize() < cfFlushSizeLowerBound);
|
||||||
|
table.put(createPut(1, 12345678));
|
||||||
|
table.flushCommits();
|
||||||
|
// Make numRolledLogFiles greater than maxLogs
|
||||||
|
desiredRegionAndServer.getSecond().walRoller.requestRollAll();
|
||||||
// Wait for some time till the flush caused by log rolling happens.
|
// Wait for some time till the flush caused by log rolling happens.
|
||||||
while (((FSHLog) (desiredRegion.getWAL())).getNumLogFiles() > maxLogs) Threads.sleep(100);
|
TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return desiredRegion.getMemstoreSize().get() == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
long memstoreSize = desiredRegion.getMemstoreSize().get();
|
||||||
|
if (memstoreSize > 0) {
|
||||||
|
return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
|
||||||
|
}
|
||||||
|
return "Unknown";
|
||||||
|
}
|
||||||
|
});
|
||||||
LOG.info("Finished waiting on flush after too many WALs...");
|
LOG.info("Finished waiting on flush after too many WALs...");
|
||||||
|
|
||||||
// We have artificially created the conditions for a log roll. When a
|
|
||||||
// log roll happens, we should flush all the column families. Testing that
|
|
||||||
// case here.
|
|
||||||
|
|
||||||
// Individual families should have been flushed.
|
// Individual families should have been flushed.
|
||||||
assertEquals(DefaultMemStore.DEEP_OVERHEAD,
|
assertEquals(DefaultMemStore.DEEP_OVERHEAD,
|
||||||
desiredRegion.getStore(FAMILY1).getMemStoreSize());
|
desiredRegion.getStore(FAMILY1).getMemStoreSize());
|
||||||
|
@ -499,9 +524,9 @@ public class TestPerColumnFamilyFlush {
|
||||||
desiredRegion.getStore(FAMILY2).getMemStoreSize());
|
desiredRegion.getStore(FAMILY2).getMemStoreSize());
|
||||||
assertEquals(DefaultMemStore.DEEP_OVERHEAD,
|
assertEquals(DefaultMemStore.DEEP_OVERHEAD,
|
||||||
desiredRegion.getStore(FAMILY3).getMemStoreSize());
|
desiredRegion.getStore(FAMILY3).getMemStoreSize());
|
||||||
|
// let WAL cleanOldLogs
|
||||||
// And of course, the total memstore should also be clean.
|
assertNull(desiredRegion.getWAL().rollWriter(true));
|
||||||
assertEquals(0, desiredRegion.getMemstoreSize().get());
|
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
|
||||||
} finally {
|
} finally {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
@ -534,7 +559,7 @@ public class TestPerColumnFamilyFlush {
|
||||||
|
|
||||||
// Under the same write load, small stores should have less store files when
|
// Under the same write load, small stores should have less store files when
|
||||||
// percolumnfamilyflush enabled.
|
// percolumnfamilyflush enabled.
|
||||||
@Test (timeout=180000)
|
@Test(timeout = 180000)
|
||||||
public void testCompareStoreFileCount() throws Exception {
|
public void testCompareStoreFileCount() throws Exception {
|
||||||
long memstoreFlushSize = 1024L * 1024;
|
long memstoreFlushSize = 1024L * 1024;
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
@ -561,7 +586,7 @@ public class TestPerColumnFamilyFlush {
|
||||||
try {
|
try {
|
||||||
TEST_UTIL.startMiniCluster(1);
|
TEST_UTIL.startMiniCluster(1);
|
||||||
TEST_UTIL.getHBaseAdmin().createNamespace(
|
TEST_UTIL.getHBaseAdmin().createNamespace(
|
||||||
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
|
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
|
||||||
TEST_UTIL.getHBaseAdmin().createTable(htd);
|
TEST_UTIL.getHBaseAdmin().createTable(htd);
|
||||||
TEST_UTIL.waitTableAvailable(TABLENAME);
|
TEST_UTIL.waitTableAvailable(TABLENAME);
|
||||||
Connection conn = ConnectionFactory.createConnection(conf);
|
Connection conn = ConnectionFactory.createConnection(conf);
|
||||||
|
@ -578,12 +603,12 @@ public class TestPerColumnFamilyFlush {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("==============Test with selective flush enabled===============");
|
LOG.info("==============Test with selective flush enabled===============");
|
||||||
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
|
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
|
||||||
try {
|
try {
|
||||||
TEST_UTIL.startMiniCluster(1);
|
TEST_UTIL.startMiniCluster(1);
|
||||||
TEST_UTIL.getHBaseAdmin().createNamespace(
|
TEST_UTIL.getHBaseAdmin().createNamespace(
|
||||||
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
|
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
|
||||||
TEST_UTIL.getHBaseAdmin().createTable(htd);
|
TEST_UTIL.getHBaseAdmin().createTable(htd);
|
||||||
Connection conn = ConnectionFactory.createConnection(conf);
|
Connection conn = ConnectionFactory.createConnection(conf);
|
||||||
Table table = conn.getTable(TABLENAME);
|
Table table = conn.getTable(TABLENAME);
|
||||||
|
|
Loading…
Reference in New Issue