HBASE-16301 Trigger flush without waiting when compaction is disabled on a table (huaxiang sun)
Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
This commit is contained in:
parent
b06119edef
commit
12549de43c
|
@ -547,6 +547,12 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isTooManyStoreFiles(Region region) {
|
private boolean isTooManyStoreFiles(Region region) {
|
||||||
|
|
||||||
|
// When compaction is disabled, the region is flushable
|
||||||
|
if (!region.getTableDesc().isCompactionEnabled()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
for (Store store : region.getStores()) {
|
for (Store store : region.getStores()) {
|
||||||
if (store.hasTooManyStoreFiles()) {
|
if (store.hasTooManyStoreFiles()) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -17,17 +17,26 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.client.*;
|
import org.apache.hadoop.hbase.client.*;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -39,15 +48,61 @@ public class TestCompactSplitThread {
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
|
private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
|
||||||
private final byte[] family = Bytes.toBytes("f");
|
private final byte[] family = Bytes.toBytes("f");
|
||||||
|
private static final int NUM_RS = 1;
|
||||||
|
private static final int blockingStoreFiles = 3;
|
||||||
|
private static Path rootDir;
|
||||||
|
private static FileSystem fs;
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testThreadPoolSizeTuning() throws Exception {
|
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
/**
|
||||||
|
* Setup the config for the cluster
|
||||||
|
*/
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
setupConf(TEST_UTIL.getConfiguration());
|
||||||
|
TEST_UTIL.startMiniCluster(NUM_RS);
|
||||||
|
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
rootDir = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setupConf(Configuration conf) {
|
||||||
|
// disable the ui
|
||||||
|
conf.setInt("hbase.regionsever.info.port", -1);
|
||||||
|
// so make sure we get a compaction when doing a load, but keep around some
|
||||||
|
// files in the store
|
||||||
|
conf.setInt("hbase.hstore.compaction.min", 2);
|
||||||
|
conf.setInt("hbase.hstore.compactionThreshold", 5);
|
||||||
|
// change the flush size to a small amount, regulating number of store files
|
||||||
|
conf.setInt("hbase.hregion.memstore.flush.size", 25000);
|
||||||
|
|
||||||
|
// block writes if we get to blockingStoreFiles store files
|
||||||
|
conf.setInt("hbase.hstore.blockingStoreFiles", blockingStoreFiles);
|
||||||
|
// Ensure no extra cleaners on by default (e.g. TimeToLiveHFileCleaner)
|
||||||
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
|
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
|
||||||
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
|
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
|
||||||
conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
|
conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
|
||||||
conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
|
conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
|
||||||
TEST_UTIL.startMiniCluster(1);
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanupTest() throws Exception {
|
||||||
|
try {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// NOOP;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreadPoolSizeTuning() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
Connection conn = ConnectionFactory.createConnection(conf);
|
Connection conn = ConnectionFactory.createConnection(conf);
|
||||||
try {
|
try {
|
||||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
@ -98,7 +153,26 @@ public class TestCompactSplitThread {
|
||||||
assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
|
assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
|
||||||
} finally {
|
} finally {
|
||||||
conn.close();
|
conn.close();
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testFlushWithTableCompactionDisabled() throws Exception {
|
||||||
|
Admin admin = TEST_UTIL.getHBaseAdmin();
|
||||||
|
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
htd.setCompactionEnabled(false);
|
||||||
|
TEST_UTIL.createTable(htd, new byte[][] { family }, null);
|
||||||
|
|
||||||
|
// load the table
|
||||||
|
for (int i = 0; i < blockingStoreFiles + 1; i ++) {
|
||||||
|
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(tableName), family);
|
||||||
|
TEST_UTIL.flush(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that store file number is greater than blockingStoreFiles + 1
|
||||||
|
Path tableDir = FSUtils.getTableDir(rootDir, tableName);
|
||||||
|
Collection<String> hfiles = SnapshotTestingUtils.listHFileNames(fs, tableDir);
|
||||||
|
assert(hfiles.size() > blockingStoreFiles + 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue