HBASE-27389 Add cost function in balancer to consider the cost of building bucket cache before moving regions

This commit is contained in:
Rahul Agarkar 2023-03-31 19:06:07 +05:30
parent 160f484352
commit 55e55ea3c0
3 changed files with 132 additions and 28 deletions

View File

@ -770,6 +770,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
List<CostFunction> getCostFunctions() {
return costFunctions;
}
/** /**
* Update both the costs of costfunctions and the weights of candidate generators * Update both the costs of costfunctions and the weights of candidate generators
*/ */

View File

@ -24,11 +24,14 @@ import static org.junit.Assert.assertTrue;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -177,6 +180,19 @@ public class TestPrefetchCacheCostLoadBalancerFunction extends StochasticBalance
// No historical prefetch // No historical prefetch
}, }; }, };
private static Configuration storedConfiguration;
@BeforeClass
public static void saveInitialConfiguration() {
storedConfiguration = new Configuration(conf);
}
@Before
public void beforeEachTest() {
conf = new Configuration(storedConfiguration);
loadBalancer.onConfigurationChange(conf);
}
@Test @Test
public void testVerifyPrefetchCostFunctionEnabled() { public void testVerifyPrefetchCostFunctionEnabled() {
conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence"); conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence");
@ -189,13 +205,22 @@ public class TestPrefetchCacheCostLoadBalancerFunction extends StochasticBalance
} }
@Test @Test
public void testVerifyPrefetchCostFunctionDisabled() { public void testVerifyPrefetchCostFunctionDisabledByNoPersistencePathKey() {
assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames())
.contains(PrefetchCacheCostFunction.class.getSimpleName())); .contains(PrefetchCacheCostFunction.class.getSimpleName()));
} }
@Test @Test
public void testPrefetchCost() throws Exception { public void testVerifyPrefetchCostFunctionDisabledByNoMultiplier() {
conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence");
conf.setFloat("hbase.master.balancer.stochastic.prefetchCacheCost", 0.0f);
assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames())
.contains(PrefetchCacheCostFunction.class.getSimpleName()));
}
@Test
public void testPrefetchCost() {
conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence"); conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence");
CostFunction costFunction = new PrefetchCacheCostFunction(conf); CostFunction costFunction = new PrefetchCacheCostFunction(conf);
@ -211,14 +236,13 @@ public class TestPrefetchCacheCostLoadBalancerFunction extends StochasticBalance
} }
private class MockClusterForPrefetch extends BalancerClusterState { private class MockClusterForPrefetch extends BalancerClusterState {
private int[][] regionServerPrefetch = null; // [region][server] = prefetch percent private final int[][] regionServerPrefetch; // [region][server] = prefetch percent
public MockClusterForPrefetch(int[][] regionsArray) { public MockClusterForPrefetch(int[][] regionsArray) {
// regions[0] is an array where index = serverIndex and value = number of regions // regions[0] is an array where index = serverIndex and value = number of regions
super(mockClusterServers(regionsArray[0], 1), null, null, null, null); super(mockClusterServers(regionsArray[0], 1), null, null, null, null);
regionServerPrefetch = new int[regionsArray.length - 1][]; regionServerPrefetch = new int[regionsArray.length - 1][];
Map<String, Map<Address, Float>> historicalPrefetchRatio = Map<String, Map<Address, Float>> historicalPrefetchRatio = new HashMap<>();
new HashMap<String, Map<Address, Float>>();
for (int i = 1; i < regionsArray.length; i++) { for (int i = 1; i < regionsArray.length; i++) {
int regionIndex = i - 1; int regionIndex = i - 1;
regionServerPrefetch[regionIndex] = new int[regionsArray[i].length - 1]; regionServerPrefetch[regionIndex] = new int[regionsArray[i].length - 1];

View File

@ -22,9 +22,10 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -41,11 +42,9 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -66,7 +65,7 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
private static final int REGION_SERVERS = 3; private static final int REGION_SERVERS = 3;
private static final int REGION_NUM = REGION_SERVERS * 3; private static final int REGION_NUM = REGION_SERVERS * 50;
private Admin admin; private Admin admin;
@ -154,6 +153,7 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
// Disable the prefetch cache cost function // Disable the prefetch cache cost function
conf.setFloat("hbase.master.balancer.stochastic.prefetchCacheCost", 0.0f); conf.setFloat("hbase.master.balancer.stochastic.prefetchCacheCost", 0.0f);
loadBalancer.loadConf(conf);
TEST_UTIL.startMiniCluster(REGION_SERVERS); TEST_UTIL.startMiniCluster(REGION_SERVERS);
TEST_UTIL.getDFSCluster().waitClusterUp(); TEST_UTIL.getDFSCluster().waitClusterUp();
@ -168,7 +168,7 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
TEST_UTIL.waitTableAvailable(tableName); TEST_UTIL.waitTableAvailable(tableName);
TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY);
admin.flush(tableName); admin.flush(tableName);
compactTable(tableName); TEST_UTIL.compact(true);
// Validate that all the other cost functions are enabled // Validate that all the other cost functions are enabled
Arrays.stream(FunctionCostKeys.values()) Arrays.stream(FunctionCostKeys.values())
@ -189,7 +189,7 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
assertEquals(REGION_SERVERS, ssmap.size()); assertEquals(REGION_SERVERS, ssmap.size());
// Get the name of the region server to shutdown and restart // Get the name of the region server to shutdown and restart
ServerName serverName = cluster.getRegionServer(REGION_SERVERS - 1).getServerName(); ServerName serverName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1);
ServerMetrics sm = ssmap.get(serverName); ServerMetrics sm = ssmap.get(serverName);
// Verify that some regions are assigned to this region server // Verify that some regions are assigned to this region server
assertTrue(0.0f != sm.getRegionMetrics().size()); assertTrue(0.0f != sm.getRegionMetrics().size());
@ -199,10 +199,10 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
cluster.stopRegionServer(serverName); cluster.stopRegionServer(serverName);
cluster.waitForRegionServerToStop(serverName, 1000); cluster.waitForRegionServerToStop(serverName, 1000);
// Compact the table so that all the regions are reassigned to the running region servers // Compact the table so that all the regions are reassigned to the running region servers
compactTable(tableName); TEST_UTIL.compact(true);
TEST_UTIL.waitUntilNoRegionsInTransition(12000); TEST_UTIL.waitUntilNoRegionsInTransition(12000);
ssmap = admin.getClusterMetrics().getLiveServerMetrics(); ssmap = cluster.getClusterMetrics().getLiveServerMetrics();
assertEquals(REGION_SERVERS - 1, ssmap.size()); assertEquals(REGION_SERVERS - 1, ssmap.size());
sm = ssmap.get(serverName); sm = ssmap.get(serverName);
// Validate that no server metrics is found for the non-active server // Validate that no server metrics is found for the non-active server
@ -215,11 +215,13 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
cluster.waitForRegionServerToStart(serverName.getHostname(), serverName.getPort(), 1000); cluster.waitForRegionServerToStart(serverName.getHostname(), serverName.getPort(), 1000);
admin.balance(); admin.balance();
TEST_UTIL.waitUntilNoRegionsInTransition(12000); TEST_UTIL.waitUntilNoRegionsInTransition(12000);
ssmap = admin.getClusterMetrics().getLiveServerMetrics(); ssmap = cluster.getClusterMetrics().getLiveServerMetrics();
assertEquals(REGION_SERVERS, ssmap.size()); assertEquals(REGION_SERVERS, ssmap.size());
serverName = cluster.getRegionServer(REGION_SERVERS - 1).getServerName(); ServerName newServerName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1);
sm = ssmap.get(serverName); // Verify that the same region server has been started
assertTrue(ServerName.isSameAddress(serverName, newServerName));
sm = ssmap.get(newServerName);
assertNotNull(sm); assertNotNull(sm);
assertTrue(sm.getRegionMetrics().size() > 0); assertTrue(sm.getRegionMetrics().size() > 0);
@ -243,6 +245,7 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
Arrays.stream(FunctionCostKeys.values()) Arrays.stream(FunctionCostKeys.values())
.forEach(functionCostKey -> conf.setFloat(functionCostKey.getValue(), 0.0f)); .forEach(functionCostKey -> conf.setFloat(functionCostKey.getValue(), 0.0f));
loadBalancer.loadConf(conf);
TEST_UTIL.startMiniCluster(REGION_SERVERS); TEST_UTIL.startMiniCluster(REGION_SERVERS);
TEST_UTIL.getDFSCluster().waitClusterUp(); TEST_UTIL.getDFSCluster().waitClusterUp();
@ -256,7 +259,7 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
TEST_UTIL.waitTableAvailable(tableName); TEST_UTIL.waitTableAvailable(tableName);
TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY);
admin.flush(tableName); admin.flush(tableName);
compactTable(tableName); TEST_UTIL.compact(true);
// Validate that all the other cost functions are disabled // Validate that all the other cost functions are disabled
Arrays.stream(FunctionCostKeys.values()) Arrays.stream(FunctionCostKeys.values())
@ -274,6 +277,7 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
TEST_UTIL.waitUntilNoRegionsInTransition(120000); TEST_UTIL.waitUntilNoRegionsInTransition(120000);
Map<ServerName, ServerMetrics> ssmap = cluster.getClusterMetrics().getLiveServerMetrics(); Map<ServerName, ServerMetrics> ssmap = cluster.getClusterMetrics().getLiveServerMetrics();
assertEquals(REGION_SERVERS, ssmap.size()); assertEquals(REGION_SERVERS, ssmap.size());
// Shutdown the last server. This is because the server id for an inactive server is reassigned // Shutdown the last server. This is because the server id for an inactive server is reassigned
@ -281,15 +285,15 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
// available // available
// server id. In our case, we want to track the same server and hence, it's safe to restart the // server id. In our case, we want to track the same server and hence, it's safe to restart the
// last server in the list // last server in the list
ServerName serverName = cluster.getRegionServer(REGION_SERVERS - 1).getServerName(); ServerName serverName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1);
ServerMetrics sm = ssmap.get(serverName); ServerMetrics sm = ssmap.get(serverName);
assertTrue(0 != sm.getRegionMetrics().size()); assertTrue(0 != sm.getRegionMetrics().size());
cluster.stopRegionServer(serverName); cluster.stopRegionServer(serverName);
cluster.waitForRegionServerToStop(serverName, 1000); cluster.waitForRegionServerToStop(serverName, 1000);
compactTable(tableName); TEST_UTIL.compact(true);
TEST_UTIL.waitUntilNoRegionsInTransition(12000); TEST_UTIL.waitUntilNoRegionsInTransition(12000);
ssmap = admin.getClusterMetrics().getLiveServerMetrics(); ssmap = cluster.getClusterMetrics().getLiveServerMetrics();
assertEquals(REGION_SERVERS - 1, ssmap.size()); assertEquals(REGION_SERVERS - 1, ssmap.size());
sm = ssmap.get(serverName); sm = ssmap.get(serverName);
assertNull(sm); assertNull(sm);
@ -299,21 +303,91 @@ public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase {
cluster.waitForRegionServerToStart(serverName.getHostname(), serverName.getPort(), 1000); cluster.waitForRegionServerToStart(serverName.getHostname(), serverName.getPort(), 1000);
admin.balance(); admin.balance();
TEST_UTIL.waitUntilNoRegionsInTransition(120000); TEST_UTIL.waitUntilNoRegionsInTransition(120000);
ssmap = admin.getClusterMetrics().getLiveServerMetrics(); ssmap = cluster.getClusterMetrics().getLiveServerMetrics();
assertEquals(REGION_SERVERS, ssmap.size()); assertEquals(REGION_SERVERS, ssmap.size());
serverName = cluster.getRegionServer(REGION_SERVERS - 1).getServerName(); ServerName newServerName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1);
sm = ssmap.get(serverName); // Verify that the same region server has been started
assertTrue(ServerName.isSameAddress(serverName, newServerName));
sm = ssmap.get(newServerName);
assertNotNull(sm); assertNotNull(sm);
assertEquals(0, sm.getRegionMetrics().size()); assertEquals(0, sm.getRegionMetrics().size());
} }
private void compactTable(TableName tableName) throws IOException { @Test
for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { public void testStressTestWithOnlyPrefetchCacheCostFunctionEnabled() throws Exception {
for (HRegion region : t.getRegionServer().getRegions(tableName)) { // Test the prefetch cache cost returned by the cost function when random servers are
region.compact(true); // restarted and only the PrefetchCacheCostFunction is enabled. Ensure that the prefetch cost
region.flush(true); // returned by the cost function is always between 0 and 1.
// Disable all other cost functions
Arrays.stream(FunctionCostKeys.values())
.forEach(functionCostKey -> conf.setFloat(functionCostKey.getValue(), 0.0f));
loadBalancer.loadConf(conf);
TEST_UTIL.startMiniCluster(REGION_SERVERS);
TEST_UTIL.getDFSCluster().waitClusterUp();
cluster = TEST_UTIL.getHBaseCluster();
admin = TEST_UTIL.getAdmin();
admin.balancerSwitch(false, true);
TableName tableName = TableName.valueOf("testTableOnlyPrefetchCacheCostFunctionEnabled");
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
admin.createTable(tableDescriptor, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY);
admin.flush(tableName);
TEST_UTIL.compact(true);
// Verify that all the other cost functions except the PrefetchCacheCostFunction are disabled
Arrays.stream(FunctionCostKeys.values())
.forEach(functionCostKey -> verifyCostFunctionState(admin.getConfiguration(),
functionCostKey.getValue(), false));
verifyCostFunctionState(admin.getConfiguration(),
"hbase.master.balancer.stochastic.prefetchCacheCost", true);
admin.balancerSwitch(true, true);
admin.balance();
TEST_UTIL.waitUntilNoRegionsInTransition(120000);
Random rand = new Random();
for (int i = 0; i < 5; i++) {
int randomServerID = rand.nextInt(REGION_SERVERS);
ServerName sn = cluster.getClusterMetrics().getServersName().get(randomServerID);
cluster.stopRegionServer(sn);
cluster.waitForRegionServerToStop(sn, 1000);
TEST_UTIL.compact(true);
TEST_UTIL.waitUntilNoRegionsInTransition(12000);
cluster.startRegionServer(sn.getHostname(), sn.getPort());
cluster.waitForRegionServerToStart(sn.getHostname(), sn.getPort(), 1000);
admin.balance();
// Verify that the same server was restarted
verifyServerActive(sn);
assertEquals(REGION_SERVERS, cluster.getClusterMetrics().getLiveServerMetrics().size());
validatePrefetchCacheCost(loadBalancer.getCostFunctions());
}
}
private void verifyServerActive(ServerName serverName) throws Exception {
// The server id of the region server may change post restart. The only way to ensure that the
// same server has been restarted is by searching for the server address (host:port) in the
// active server list
boolean found = false;
for (ServerName sname : cluster.getClusterMetrics().getServersName()) {
if (ServerName.isSameAddress(sname, serverName)) {
found = true;
break;
}
}
assertTrue(found);
}
private void validatePrefetchCacheCost(List<CostFunction> cf) {
for (CostFunction c : cf) {
if (c.getMultiplier() > 0.0f) {
assertTrue(c.cost() >= 0.0 && c.cost() <= 1.0);
} }
} }
} }