diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java index c360985ba5e..8867611baca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java @@ -74,6 +74,7 @@ public abstract class PressureAwareThroughputController extends Configured imple protected int tuningPeriod; private volatile double maxThroughput; + private volatile double maxThroughputPerOperation; protected final ConcurrentMap activeOperations = new ConcurrentHashMap(); @@ -96,6 +97,7 @@ public abstract class PressureAwareThroughputController extends Configured imple @Override public void start(String opName) { activeOperations.put(opName, new ActiveOperation()); + maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); } @Override @@ -107,8 +109,7 @@ public abstract class PressureAwareThroughputController extends Configured imple return 0; } long now = EnvironmentEdgeManager.currentTime(); - double maxThroughputPerCompaction = this.getMaxThroughput() / activeOperations.size(); - long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms + long minTimeAllowed = (long) (deltaSize / maxThroughputPerOperation * 1000); // ms long elapsedTime = now - operation.lastControlTime; operation.lastControlSize = operation.totalSize; if (elapsedTime >= minTimeAllowed) { @@ -123,7 +124,7 @@ public abstract class PressureAwareThroughputController extends Configured imple LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns"); LOG.debug(opName + " sleep " + sleepTime + " ms because current throughput is " + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " - + throughputDesc(maxThroughputPerCompaction) + ", already slept " + + throughputDesc(maxThroughputPerOperation) + ", already slept " + operation.numberOfSleeps + " time(s) and total slept time is " + operation.totalSleepTime + " ms till now."); operation.lastLogTime = now; @@ -147,6 +148,7 @@ public abstract class PressureAwareThroughputController extends Configured imple @Override public void finish(String opName) { ActiveOperation operation = activeOperations.remove(opName); + maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime; LOG.info(opName + " average throughput is " + throughputDesc(operation.totalSize, elapsedTime) + ", slept " @@ -173,5 +175,6 @@ public abstract class PressureAwareThroughputController extends Configured imple public void setMaxThroughput(double maxThroughput) { this.maxThroughput = maxThroughput; + maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java index ae6b0368bab..f328b1c7d93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java @@ -16,6 +16,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; @@ -38,27 +38,42 @@ import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; @Category(MediumTests.class) public class TestFlushWithThroughputController { - private static final Log LOG = LogFactory.getLog(TestFlushWithThroughputController.class); - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final double EPSILON = 1E-6; - private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); - + private HBaseTestingUtility hbtu; + @Rule public TestName testName = new TestName(); + private TableName tableName; private final byte[] family = Bytes.toBytes("f"); - private final byte[] qualifier = Bytes.toBytes("q"); + @Before + public void setUp() { + hbtu = new HBaseTestingUtility(); + tableName = TableName.valueOf("Table-" + testName.getMethodName()); + hbtu.getConfiguration().set( + FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + PressureAwareFlushThroughputController.class.getName()); + } + + @After + public void tearDown() throws Exception { + hbtu.shutdownMiniCluster(); + } + private Store getStoreWithName(TableName tableName) { - MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + MiniHBaseCluster cluster = hbtu.getMiniHBaseCluster(); List rsts = cluster.getRegionServerThreads(); for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { HRegionServer hrs = rsts.get(i).getRegionServer(); @@ -69,84 +84,64 @@ public class TestFlushWithThroughputController { return null; } - private Store generateAndFlushData() throws IOException { - HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - Table table = TEST_UTIL.createTable(tableName, family); + private void setMaxMinThroughputs(long max, long min) { + Configuration conf = hbtu.getConfiguration(); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, min); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, max); + } + + /** + * Writes Puts to the table and flushes few times. + * @return {@link Pair} of (throughput, duration). + */ + private Pair generateAndFlushData(Table table) throws IOException { + // Internally, throughput is controlled after every cell write, so keep value size less for + // better control. + final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024; Random rand = new Random(); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < 10; j++) { - byte[] value = new byte[256 * 1024]; + long duration = 0; + for (int i = 0; i < NUM_FLUSHES; i++) { + // Write about 10M (10 times of throughput rate) per iteration. + for (int j = 0; j < NUM_PUTS; j++) { + byte[] value = new byte[VALUE_SIZE]; rand.nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } - admin.flush(tableName); + long startTime = System.nanoTime(); + hbtu.getHBaseAdmin().flush(tableName); + duration += System.nanoTime() - startTime; } - return getStoreWithName(tableName); + Store store = getStoreWithName(tableName); + assertEquals(NUM_FLUSHES, store.getStorefilesCount()); + double throughput = (double)store.getStorefilesSize() + / TimeUnit.NANOSECONDS.toSeconds(duration); + return new Pair<>(throughput, duration); } private long testFlushWithThroughputLimit() throws Exception { - long throughputLimit = 1L * 1024 * 1024; - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, - PressureAwareFlushThroughputController.class.getName()); - conf.setLong( - PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, - throughputLimit); - conf.setLong( - PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, - throughputLimit); + final long throughputLimit = 1024 * 1024; + setMaxMinThroughputs(throughputLimit, throughputLimit); + Configuration conf = hbtu.getConfiguration(); conf.setLong( PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, throughputLimit); - TEST_UTIL.startMiniCluster(1); - try { - long startTime = System.nanoTime(); - Store store = generateAndFlushData(); - assertEquals(10, store.getStorefilesCount()); - long duration = System.nanoTime() - startTime; - double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000; - LOG.debug("Throughput is: " + (throughput / 1024 / 1024) + " MB/s"); - // confirm that the speed limit work properly(not too fast, and also not too slow) - // 20% is the max acceptable error rate. - assertTrue(throughput < throughputLimit * 1.2); - assertTrue(throughput > throughputLimit * 0.8); - return duration; - } finally { - TEST_UTIL.shutdownMiniCluster(); - } - } - - private long testFlushWithoutThroughputLimit() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, - NoLimitThroughputController.class.getName()); - TEST_UTIL.startMiniCluster(1); - try { - long startTime = System.nanoTime(); - Store store = generateAndFlushData(); - assertEquals(10, store.getStorefilesCount()); - long duration = System.nanoTime() - startTime; - double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000; - LOG.debug("Throughput w/o limit is: " + (throughput / 1024 / 1024) + " MB/s"); - return duration; - } finally { - TEST_UTIL.shutdownMiniCluster(); - } + hbtu.startMiniCluster(1); + Table table = hbtu.createTable(tableName, family); + Pair result = generateAndFlushData(table); + hbtu.deleteTable(tableName); + LOG.debug("Throughput is: " + (result.getFirst() / 1024 / 1024) + " MB/s"); + // confirm that the speed limit work properly(not too fast, and also not too slow) + // 20% is the max acceptable error rate. + assertTrue(result.getFirst() < throughputLimit * 1.2); + assertTrue(result.getFirst() > throughputLimit * 0.8); + return result.getSecond(); } @Test public void testFlushControl() throws Exception { - long limitTime = testFlushWithThroughputLimit(); - long noLimitTime = testFlushWithoutThroughputLimit(); - LOG.info("With 1M/s limit, flush use " + (limitTime / 1000000) - + "ms; without limit, flush use " + (noLimitTime / 1000000) + "ms"); - // Commonly if multiple region flush at the same time, the throughput could be very high - // but flush in this test is in serial, so we use a weak assumption. - assertTrue(limitTime > 2 * noLimitTime); + testFlushWithThroughputLimit(); } /** @@ -154,57 +149,46 @@ public class TestFlushWithThroughputController { */ @Test public void testFlushThroughputTuning() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); + Configuration conf = hbtu.getConfiguration(); + setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); - conf.setLong( - PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, - 20L * 1024 * 1024); - conf.setLong( - PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, - 10L * 1024 * 1024); - conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, - PressureAwareFlushThroughputController.class.getName()); conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 3000); - TEST_UTIL.startMiniCluster(1); + hbtu.startMiniCluster(1); Connection conn = ConnectionFactory.createConnection(conf); - try { - HTableDescriptor htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(family)); - htd.setCompactionEnabled(false); - TEST_UTIL.getHBaseAdmin().createTable(htd); - TEST_UTIL.waitTableAvailable(tableName); - HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); - PressureAwareFlushThroughputController throughputController = - (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); - for (Region region : regionServer.getOnlineRegions()) { - region.flush(true); - } - assertEquals(0.0, regionServer.getFlushPressure(), EPSILON); - Thread.sleep(5000); - assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); - Table table = conn.getTable(tableName); - Random rand = new Random(); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < 10; j++) { - byte[] value = new byte[256 * 1024]; - rand.nextBytes(value); - table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); - } - } - Thread.sleep(5000); - double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); - assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); - - conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, - NoLimitThroughputController.class.getName()); - regionServer.onConfigurationChange(conf); - assertTrue(throughputController.isStopped()); - assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); - } finally { - conn.close(); - TEST_UTIL.shutdownMiniCluster(); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + hbtu.getHBaseAdmin().createTable(htd); + hbtu.waitTableAvailable(tableName); + HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); + PressureAwareFlushThroughputController throughputController = + (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); + for (Region region : regionServer.getOnlineRegions()) { + region.flush(true); } + assertEquals(0.0, regionServer.getFlushPressure(), EPSILON); + Thread.sleep(5000); + assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); + Table table = conn.getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[256 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + } + Thread.sleep(5000); + double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); + assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); + + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + regionServer.onConfigurationChange(conf); + assertTrue(throughputController.isStopped()); + assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); + conn.close(); } /** @@ -212,8 +196,8 @@ public class TestFlushWithThroughputController { */ @Test public void testFlushControlForStripedStore() throws Exception { - TEST_UTIL.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, + hbtu.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); - testFlushControl(); + testFlushWithThroughputLimit(); } }