HBASE-23078 BaseLoadBalancer should consider region replicas when randomAssignment and roundRobinAssignment (#663)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
9f7c73caa2
commit
9f703fc3b2
|
@ -37,11 +37,9 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -746,26 +744,6 @@ public class RegionStates {
|
|||
return serverNode;
|
||||
}
|
||||
|
||||
public boolean isReplicaAvailableForRegion(final RegionInfo info) {
|
||||
// if the region info itself is a replica return true.
|
||||
if (!RegionReplicaUtil.isDefaultReplica(info)) {
|
||||
return true;
|
||||
}
|
||||
// iterate the regionsMap for the given region name. If there are replicas it should
|
||||
// list them in order.
|
||||
for (RegionStateNode node : regionsMap.tailMap(info.getRegionName()).values()) {
|
||||
if (!node.getTable().equals(info.getTable())
|
||||
|| !ServerRegionReplicaUtil.isReplicasForSameRegion(info, node.getRegionInfo())) {
|
||||
break;
|
||||
} else if (!RegionReplicaUtil.isDefaultReplica(node.getRegionInfo())) {
|
||||
// we have replicas
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// we don have replicas
|
||||
return false;
|
||||
}
|
||||
|
||||
public ServerStateNode removeRegionFromServer(final ServerName serverName,
|
||||
final RegionStateNode regionNode) {
|
||||
ServerStateNode serverNode = getOrCreateServer(serverName);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -48,12 +49,11 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
|
||||
|
@ -1263,7 +1263,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
return assignments;
|
||||
}
|
||||
|
||||
Cluster cluster = createCluster(servers, regions, false);
|
||||
Cluster cluster = createCluster(servers, regions);
|
||||
List<RegionInfo> unassignedRegions = new ArrayList<>();
|
||||
|
||||
roundRobinAssignment(cluster, regions, unassignedRegions,
|
||||
|
@ -1319,8 +1319,24 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
return assignments;
|
||||
}
|
||||
|
||||
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions,
|
||||
boolean hasRegionReplica) {
|
||||
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
|
||||
throws HBaseIOException {
|
||||
boolean hasRegionReplica = false;
|
||||
try {
|
||||
if (services != null && services.getTableDescriptors() != null) {
|
||||
Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
|
||||
for (RegionInfo regionInfo : regions) {
|
||||
TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
|
||||
if (td != null && td.getRegionReplication() > 1) {
|
||||
hasRegionReplica = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new HBaseIOException(ioe);
|
||||
}
|
||||
|
||||
// Get the snapshot of the current assignments for the regions in question, and then create
|
||||
// a cluster out of it. Note that we might have replicas already assigned to some servers
|
||||
// earlier. So we want to get the snapshot to see those assignments, but this will only contain
|
||||
|
@ -1380,7 +1396,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
final List<ServerName> finalServers = idleServers.isEmpty() ?
|
||||
servers : idleServers;
|
||||
List<RegionInfo> regions = Lists.newArrayList(regionInfo);
|
||||
Cluster cluster = createCluster(finalServers, regions, false);
|
||||
Cluster cluster = createCluster(finalServers, regions);
|
||||
return randomAssignment(cluster, regionInfo, finalServers);
|
||||
}
|
||||
|
||||
|
@ -1452,21 +1468,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
int numRandomAssignments = 0;
|
||||
int numRetainedAssigments = 0;
|
||||
boolean hasRegionReplica = false;
|
||||
for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
|
||||
RegionInfo region = entry.getKey();
|
||||
ServerName oldServerName = entry.getValue();
|
||||
// In the current set of regions even if one has region replica let us go with
|
||||
// getting the entire snapshot
|
||||
if (this.services != null) { // for tests
|
||||
AssignmentManager am = this.services.getAssignmentManager();
|
||||
if (am != null) {
|
||||
RegionStates states = am.getRegionStates();
|
||||
if (!hasRegionReplica && states != null && states.isReplicaAvailableForRegion(region)) {
|
||||
hasRegionReplica = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
List<ServerName> localServers = new ArrayList<>();
|
||||
if (oldServerName != null) {
|
||||
localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
|
||||
|
@ -1506,7 +1510,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
// If servers from prior assignment aren't present, then lets do randomAssignment on regions.
|
||||
if (randomAssignRegions.size() > 0) {
|
||||
Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica);
|
||||
Cluster cluster = createCluster(servers, regions.keySet());
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
|
||||
ServerName sn = entry.getKey();
|
||||
for (RegionInfo region : entry.getValue()) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
|
|||
import org.apache.hadoop.hbase.MetaTableAccessor.Visitor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
|
@ -58,6 +60,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -147,15 +150,7 @@ public class TestMasterOperationsForRegionReplicas {
|
|||
|
||||
List<RegionInfo> hris = MetaTableAccessor.getTableRegions(ADMIN.getConnection(), tableName);
|
||||
assertEquals(numRegions * numReplica, hris.size());
|
||||
// check that the master created expected number of RegionState objects
|
||||
for (int i = 0; i < numRegions; i++) {
|
||||
for (int j = 0; j < numReplica; j++) {
|
||||
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
|
||||
RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
|
||||
.getRegionStates().getRegionState(replica);
|
||||
assertNotNull(state);
|
||||
}
|
||||
}
|
||||
assertRegionStateNotNull(hris, numRegions, numReplica);
|
||||
|
||||
List<Result> metaRows = MetaTableAccessor.fullScanRegions(ADMIN.getConnection());
|
||||
int numRows = 0;
|
||||
|
@ -184,14 +179,26 @@ public class TestMasterOperationsForRegionReplicas {
|
|||
TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition();
|
||||
for (int i = 0; i < numRegions; i++) {
|
||||
for (int j = 0; j < numReplica; j++) {
|
||||
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
|
||||
RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
|
||||
.getRegionStates().getRegionState(replica);
|
||||
assertNotNull(state);
|
||||
}
|
||||
assertRegionStateNotNull(hris, numRegions, numReplica);
|
||||
validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica,
|
||||
ADMIN.getConnection());
|
||||
|
||||
// Now shut the whole cluster down, and verify the assignments are kept so that the
|
||||
// availability constraints are met. MiniHBaseCluster chooses arbitrary ports on each
|
||||
// restart. This messes with our being able to test that we retain locality. Therefore,
|
||||
// figure current cluster ports and pass them in on next cluster start so new cluster comes
|
||||
// up at same coordinates -- and the assignment retention logic has a chance to cut in.
|
||||
List<Integer> rsports = new ArrayList<>();
|
||||
for (JVMClusterUtil.RegionServerThread rst : TEST_UTIL.getHBaseCluster()
|
||||
.getLiveRegionServerThreads()) {
|
||||
rsports.add(rst.getRegionServer().getRpcServer().getListenerAddress().getPort());
|
||||
}
|
||||
TEST_UTIL.shutdownMiniHBaseCluster();
|
||||
StartMiniClusterOption option =
|
||||
StartMiniClusterOption.builder().numRegionServers(numSlaves).rsPorts(rsports).build();
|
||||
TEST_UTIL.startMiniHBaseCluster(option);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition();
|
||||
validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica,
|
||||
ADMIN.getConnection());
|
||||
|
||||
|
@ -255,6 +262,19 @@ public class TestMasterOperationsForRegionReplicas {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertRegionStateNotNull(List<RegionInfo> hris, int numRegions, int numReplica) {
|
||||
// check that the master created expected number of RegionState objects
|
||||
for (int i = 0; i < numRegions; i++) {
|
||||
for (int j = 0; j < numReplica; j++) {
|
||||
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
|
||||
RegionState state =
|
||||
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||
.getRegionState(replica);
|
||||
assertNotNull(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Enable when we have support for alter_table- HBASE-10361")
|
||||
public void testIncompleteMetaTableReplicaInformation() throws Exception {
|
||||
|
|
|
@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.master.procedure;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -29,11 +32,14 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -125,6 +131,7 @@ public class TestSCPBase {
|
|||
long procId = getSCPProcId(procExec);
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId);
|
||||
}
|
||||
assertReplicaDistributed(t);
|
||||
assertEquals(count, util.countRows(t));
|
||||
assertEquals(checksum, util.checksumRows(t));
|
||||
}
|
||||
|
@ -135,6 +142,36 @@ public class TestSCPBase {
|
|||
return procExec.getActiveProcIds().stream().mapToLong(Long::longValue).min().getAsLong();
|
||||
}
|
||||
|
||||
private void assertReplicaDistributed(Table t) throws IOException {
|
||||
if (t.getDescriptor().getRegionReplication() <= 1) {
|
||||
return;
|
||||
}
|
||||
// Assert all data came back.
|
||||
List<RegionInfo> regionInfos = new ArrayList<>();
|
||||
for (RegionServerThread rs : this.util.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
regionInfos.clear();
|
||||
for (Region r : rs.getRegionServer().getRegions(t.getName())) {
|
||||
LOG.info("The region is " + r.getRegionInfo() + " the location is " +
|
||||
rs.getRegionServer().getServerName());
|
||||
if (contains(regionInfos, r.getRegionInfo())) {
|
||||
LOG.error("Am exiting");
|
||||
fail("Replica regions should be assigned to different region servers");
|
||||
} else {
|
||||
regionInfos.add(r.getRegionInfo());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean contains(List<RegionInfo> regionInfos, RegionInfo regionInfo) {
|
||||
for (RegionInfo info : regionInfos) {
|
||||
if (RegionReplicaUtil.isReplicasForSameRegion(info, regionInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected Table createTable(final TableName tableName) throws IOException {
|
||||
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
|
||||
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, getRegionReplication());
|
||||
|
|
|
@ -24,10 +24,12 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
|
@ -37,7 +39,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -64,31 +65,27 @@ public class TestRegionReplicasWithRestartScenarios {
|
|||
|
||||
private static final int NB_SERVERS = 3;
|
||||
private Table table;
|
||||
private TableName tableName;
|
||||
|
||||
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||
private static final byte[] f = HConstants.CATALOG_FAMILY;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
// Reduce the hdfs block size and prefetch to trigger the file-link reopen
|
||||
// when the file is moved to archive (e.g. compaction)
|
||||
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
|
||||
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
|
||||
HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
|
||||
HTU.getConfiguration().setInt("hbase.master.wait.on.regionservers.mintostart", 3);
|
||||
HTU.getConfiguration().setInt("hbase.master.wait.on.regionservers.mintostart", NB_SERVERS);
|
||||
HTU.startMiniCluster(NB_SERVERS);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
||||
// Create table then get the single region for our new table.
|
||||
this.table = createTableDirectlyFromHTD(tableName);
|
||||
this.tableName = TableName.valueOf(this.name.getMethodName());
|
||||
this.table = createTableDirectlyFromHTD(this.tableName);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws IOException {
|
||||
this.table.close();
|
||||
HTU.deleteTable(this.tableName);
|
||||
}
|
||||
|
||||
private static Table createTableDirectlyFromHTD(final TableName tableName) throws IOException {
|
||||
|
@ -125,6 +122,20 @@ public class TestRegionReplicasWithRestartScenarios {
|
|||
|
||||
@Test
|
||||
public void testRegionReplicasCreated() throws Exception {
|
||||
assertReplicaDistributed();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenRestart() throws Exception {
|
||||
ServerName serverName = getRS().getServerName();
|
||||
HTU.getHBaseCluster().stopRegionServer(serverName);
|
||||
HTU.getHBaseCluster().waitForRegionServerToStop(serverName, 60000);
|
||||
HTU.getHBaseCluster().startRegionServerAndWait(60000);
|
||||
HTU.waitTableAvailable(this.tableName);
|
||||
assertReplicaDistributed();
|
||||
}
|
||||
|
||||
private void assertReplicaDistributed() throws Exception {
|
||||
Collection<HRegion> onlineRegions = getRS().getOnlineRegionsLocalContext();
|
||||
boolean res = checkDuplicates(onlineRegions);
|
||||
assertFalse(res);
|
||||
|
@ -150,7 +161,7 @@ public class TestRegionReplicasWithRestartScenarios {
|
|||
RegionReplicaUtil.getRegionInfoForDefaultReplica(actualRegion.getRegionInfo()))) {
|
||||
i++;
|
||||
if (i > 1) {
|
||||
LOG.info("Duplicate found " + actualRegion.getRegionInfo() + " " +
|
||||
LOG.warn("Duplicate found {} and {}", actualRegion.getRegionInfo(),
|
||||
region.getRegionInfo());
|
||||
assertTrue(Bytes.equals(region.getRegionInfo().getStartKey(),
|
||||
actualRegion.getRegionInfo().getStartKey()));
|
||||
|
|
Loading…
Reference in New Issue