HBASE-25926 Cleanup MetaTableAccessor references in FavoredNodeBalancer related code (#3313)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-05-27 16:05:14 +08:00 committed by GitHub
parent a22e418cf6
commit 63141bf576
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 150 additions and 153 deletions

View File

@ -28,17 +28,16 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.CatalogFamilyFormat;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -55,6 +54,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes;
@ -74,7 +75,6 @@ public class FavoredNodeAssignmentHelper {
// This map serves as a cache for rack to sn lookups. The num of // This map serves as a cache for rack to sn lookups. The num of
// region server entries might not match with that is in servers. // region server entries might not match with that is in servers.
private Map<String, String> regionServerToRackMap; private Map<String, String> regionServerToRackMap;
private Random random;
private List<ServerName> servers; private List<ServerName> servers;
public static final byte [] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn"); public static final byte [] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn");
public final static short FAVORED_NODES_NUM = 3; public final static short FAVORED_NODES_NUM = 3;
@ -91,7 +91,6 @@ public class FavoredNodeAssignmentHelper {
this.rackToRegionServerMap = new HashMap<>(); this.rackToRegionServerMap = new HashMap<>();
this.regionServerToRackMap = new HashMap<>(); this.regionServerToRackMap = new HashMap<>();
this.uniqueRackList = new ArrayList<>(); this.uniqueRackList = new ArrayList<>();
this.random = new Random();
} }
// Always initialize() when FavoredNodeAssignmentHelper is constructed. // Always initialize() when FavoredNodeAssignmentHelper is constructed.
@ -120,80 +119,58 @@ public class FavoredNodeAssignmentHelper {
* Update meta table with favored nodes info * Update meta table with favored nodes info
* @param regionToFavoredNodes map of RegionInfo's to their favored nodes * @param regionToFavoredNodes map of RegionInfo's to their favored nodes
* @param connection connection to be used * @param connection connection to be used
* @throws IOException
*/ */
public static void updateMetaWithFavoredNodesInfo( public static void updateMetaWithFavoredNodesInfo(
Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Connection connection)
Connection connection) throws IOException { throws IOException {
List<Put> puts = new ArrayList<>(); List<Put> puts = new ArrayList<>();
for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue()); Put put = makePut(entry.getKey(), entry.getValue());
if (put != null) { if (put != null) {
puts.add(put); puts.add(put);
} }
} }
MetaTableAccessor.putsToMetaTable(connection, puts); try (Table table = connection.getTable(TableName.META_TABLE_NAME)) {
LOG.info("Added " + puts.size() + " regions in META"); table.put(puts);
}
LOG.info("Added " + puts.size() + " region favored nodes in META");
} }
/** /**
* Update meta table with favored nodes info * Update meta table with favored nodes info
* @param regionToFavoredNodes
* @param conf
* @throws IOException
*/ */
public static void updateMetaWithFavoredNodesInfo( public static void updateMetaWithFavoredNodesInfo(
Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Configuration conf) throws IOException {
Configuration conf) throws IOException {
List<Put> puts = new ArrayList<>();
for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue());
if (put != null) {
puts.add(put);
}
}
// Write the region assignments to the meta table. // Write the region assignments to the meta table.
// TODO: See above overrides take a Connection rather than a Configuration only the // TODO: See above overrides take a Connection rather than a Configuration only the
// Connection is a short circuit connection. That is not going to good in all cases, when // Connection is a short circuit connection. That is not going to good in all cases, when
// master and meta are not colocated. Fix when this favored nodes feature is actually used // master and meta are not colocated. Fix when this favored nodes feature is actually used
// someday. // someday.
try (Connection connection = ConnectionFactory.createConnection(conf)) { try (Connection conn = ConnectionFactory.createConnection(conf)) {
try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { updateMetaWithFavoredNodesInfo(regionToFavoredNodes, conn);
metaTable.put(puts);
}
} }
LOG.info("Added " + puts.size() + " regions in META");
} }
/** private static Put makePut(RegionInfo regionInfo, List<ServerName> favoredNodeList)
* Generates and returns a Put containing the region info for the catalog table and the servers throws IOException {
* @return Put object if (CollectionUtils.isEmpty(favoredNodeList)) {
*/ return null;
private static Put makePutFromRegionInfo(RegionInfo regionInfo, List<ServerName> favoredNodeList)
throws IOException {
Put put = null;
if (favoredNodeList != null) {
long time = EnvironmentEdgeManager.currentTime();
put = MetaTableAccessor.makePutFromRegionInfo(regionInfo, time);
byte[] favoredNodes = getFavoredNodes(favoredNodeList);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY)
.setQualifier(FAVOREDNODES_QUALIFIER)
.setTimestamp(time)
.setType(Type.Put)
.setValue(favoredNodes)
.build());
LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(),
favoredNodeList);
} }
long time = EnvironmentEdgeManager.currentTime();
Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
byte[] favoredNodes = getFavoredNodes(favoredNodeList);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY).setQualifier(FAVOREDNODES_QUALIFIER).setTimestamp(time)
.setType(Cell.Type.Put).setValue(favoredNodes).build());
LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(),
favoredNodeList);
return put; return put;
} }
/** /**
* Convert PB bytes to ServerName.
* @param favoredNodes The PB'ed bytes of favored nodes * @param favoredNodes The PB'ed bytes of favored nodes
* @return the array of {@link ServerName} for the byte array of favored nodes. * @return the array of {@link ServerName} for the byte array of favored nodes.
* @throws IOException
*/ */
public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException { public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException {
FavoredNodes f = FavoredNodes.parseFrom(favoredNodes); FavoredNodes f = FavoredNodes.parseFrom(favoredNodes);
@ -235,7 +212,7 @@ public class FavoredNodeAssignmentHelper {
Map<RegionInfo, ServerName> primaryRSMap, List<RegionInfo> regions) { Map<RegionInfo, ServerName> primaryRSMap, List<RegionInfo> regions) {
List<String> rackList = new ArrayList<>(rackToRegionServerMap.size()); List<String> rackList = new ArrayList<>(rackToRegionServerMap.size());
rackList.addAll(rackToRegionServerMap.keySet()); rackList.addAll(rackToRegionServerMap.keySet());
int rackIndex = random.nextInt(rackList.size()); int rackIndex = ThreadLocalRandom.current().nextInt(rackList.size());
int maxRackSize = 0; int maxRackSize = 0;
for (Map.Entry<String,List<ServerName>> r : rackToRegionServerMap.entrySet()) { for (Map.Entry<String,List<ServerName>> r : rackToRegionServerMap.entrySet()) {
if (r.getValue().size() > maxRackSize) { if (r.getValue().size() > maxRackSize) {
@ -244,7 +221,7 @@ public class FavoredNodeAssignmentHelper {
} }
int numIterations = 0; int numIterations = 0;
// Initialize the current processing host index. // Initialize the current processing host index.
int serverIndex = random.nextInt(maxRackSize); int serverIndex = ThreadLocalRandom.current().nextInt(maxRackSize);
for (RegionInfo regionInfo : regions) { for (RegionInfo regionInfo : regions) {
List<ServerName> currentServerList; List<ServerName> currentServerList;
String rackName; String rackName;
@ -589,7 +566,7 @@ public class FavoredNodeAssignmentHelper {
} }
ServerName randomServer = null; ServerName randomServer = null;
int randomIndex = random.nextInt(serversToChooseFrom.size()); int randomIndex = ThreadLocalRandom.current().nextInt(serversToChooseFrom.size());
int j = 0; int j = 0;
for (StartcodeAgnosticServerName sn : serversToChooseFrom) { for (StartcodeAgnosticServerName sn : serversToChooseFrom) {
if (j == randomIndex) { if (j == randomIndex) {
@ -610,14 +587,14 @@ public class FavoredNodeAssignmentHelper {
return this.getOneRandomServer(rack, null); return this.getOneRandomServer(rack, null);
} }
protected String getOneRandomRack(Set<String> skipRackSet) throws IOException { String getOneRandomRack(Set<String> skipRackSet) throws IOException {
if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) { if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) {
throw new IOException("Cannot randomly pick another random server"); throw new IOException("Cannot randomly pick another random server");
} }
String randomRack; String randomRack;
do { do {
int randomIndex = random.nextInt(this.uniqueRackList.size()); int randomIndex = ThreadLocalRandom.current().nextInt(this.uniqueRackList.size());
randomRack = this.uniqueRackList.get(randomIndex); randomRack = this.uniqueRackList.get(randomIndex);
} while (skipRackSet.contains(randomRack)); } while (skipRackSet.contains(randomRack));
@ -771,7 +748,7 @@ public class FavoredNodeAssignmentHelper {
public List<ServerName> generateFavoredNodes(RegionInfo hri) throws IOException { public List<ServerName> generateFavoredNodes(RegionInfo hri) throws IOException {
List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
ServerName primary = servers.get(random.nextInt(servers.size())); ServerName primary = servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
favoredNodesForRegion.add(ServerName.valueOf(primary.getAddress(), ServerName.NON_STARTCODE)); favoredNodesForRegion.add(ServerName.valueOf(primary.getAddress(), ServerName.NON_STARTCODE));
Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1); Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);

View File

@ -32,16 +32,16 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.hbase.CatalogFamilyFormat; import org.apache.hadoop.hbase.CatalogFamilyFormat;
import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan; import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -98,78 +98,97 @@ public class SnapshotOfRegionAssignmentFromMeta {
this.excludeOfflinedSplitParents = excludeOfflinedSplitParents; this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
} }
private void processMetaRecord(Result result) throws IOException {
if (result == null || result.isEmpty()) {
return;
}
RegionLocations rl = CatalogFamilyFormat.getRegionLocations(result);
if (rl == null) {
return;
}
RegionInfo hri = rl.getRegionLocation(0).getRegion();
if (hri == null) {
return;
}
if (hri.getTable() == null) {
return;
}
if (disabledTables.contains(hri.getTable())) {
return;
}
// Are we to include split parents in the list?
if (excludeOfflinedSplitParents && hri.isSplit()) {
return;
}
HRegionLocation[] hrls = rl.getRegionLocations();
// Add the current assignment to the snapshot for all replicas
for (int i = 0; i < hrls.length; i++) {
if (hrls[i] == null) {
continue;
}
hri = hrls[i].getRegion();
if (hri == null) {
continue;
}
addAssignment(hri, hrls[i].getServerName());
addRegion(hri);
}
hri = rl.getRegionLocation(0).getRegion();
// the code below is to handle favored nodes
byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY,
FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
if (favoredNodes == null) {
return;
}
// Add the favored nodes into assignment plan
ServerName[] favoredServerList = FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
// Add the favored nodes into assignment plan
existingAssignmentPlan.updateFavoredNodesMap(hri, Arrays.asList(favoredServerList));
/*
* Typically there should be FAVORED_NODES_NUM favored nodes for a region in meta. If there is
* less than FAVORED_NODES_NUM, lets use as much as we can but log a warning.
*/
if (favoredServerList.length != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
LOG.warn("Insufficient favored nodes for region " + hri + " fn: " +
Arrays.toString(favoredServerList));
}
for (int i = 0; i < favoredServerList.length; i++) {
if (i == PRIMARY.ordinal()) {
addPrimaryAssignment(hri, favoredServerList[i]);
}
if (i == SECONDARY.ordinal()) {
addSecondaryAssignment(hri, favoredServerList[i]);
}
if (i == TERTIARY.ordinal()) {
addTeritiaryAssignment(hri, favoredServerList[i]);
}
}
}
/** /**
* Initialize the region assignment snapshot by scanning the hbase:meta table * Initialize the region assignment snapshot by scanning the hbase:meta table
* @throws IOException
*/ */
public void initialize() throws IOException { public void initialize() throws IOException {
LOG.info("Start to scan the hbase:meta for the current region assignment " + LOG.info("Start to scan the hbase:meta for the current region assignment " + "snappshot");
"snappshot"); // Scan hbase:meta to pick up user regions
// TODO: at some point this code could live in the MetaTableAccessor try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
ClientMetaTableAccessor.Visitor v = new ClientMetaTableAccessor.Visitor() { ResultScanner scanner = metaTable.getScanner(HConstants.CATALOG_FAMILY)) {
@Override for (;;) {
public boolean visit(Result result) throws IOException { Result result = scanner.next();
if (result == null) {
break;
}
try { try {
if (result == null || result.isEmpty()) return true; processMetaRecord(result);
RegionLocations rl = CatalogFamilyFormat.getRegionLocations(result);
if (rl == null) return true;
RegionInfo hri = rl.getRegionLocation(0).getRegion();
if (hri == null) return true;
if (hri.getTable() == null) return true;
if (disabledTables.contains(hri.getTable())) {
return true;
}
// Are we to include split parents in the list?
if (excludeOfflinedSplitParents && hri.isSplit()) return true;
HRegionLocation[] hrls = rl.getRegionLocations();
// Add the current assignment to the snapshot for all replicas
for (int i = 0; i < hrls.length; i++) {
if (hrls[i] == null) continue;
hri = hrls[i].getRegion();
if (hri == null) continue;
addAssignment(hri, hrls[i].getServerName());
addRegion(hri);
}
hri = rl.getRegionLocation(0).getRegion();
// the code below is to handle favored nodes
byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY,
FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
if (favoredNodes == null) return true;
// Add the favored nodes into assignment plan
ServerName[] favoredServerList =
FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
// Add the favored nodes into assignment plan
existingAssignmentPlan.updateFavoredNodesMap(hri,
Arrays.asList(favoredServerList));
/*
* Typically there should be FAVORED_NODES_NUM favored nodes for a region in meta. If
* there is less than FAVORED_NODES_NUM, lets use as much as we can but log a warning.
*/
if (favoredServerList.length != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
LOG.warn("Insufficient favored nodes for region " + hri + " fn: " + Arrays
.toString(favoredServerList));
}
for (int i = 0; i < favoredServerList.length; i++) {
if (i == PRIMARY.ordinal()) addPrimaryAssignment(hri, favoredServerList[i]);
if (i == SECONDARY.ordinal()) addSecondaryAssignment(hri, favoredServerList[i]);
if (i == TERTIARY.ordinal()) addTeritiaryAssignment(hri, favoredServerList[i]);
}
return true;
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOG.error("Catche remote exception " + e.getMessage() + LOG.error("Catch remote exception " + e.getMessage() + " when processing" + result);
" when processing" + result);
throw e; throw e;
} }
} }
}; }
// Scan hbase:meta to pick up user regions LOG.info("Finished to scan the hbase:meta for the current region assignment" + "snapshot");
MetaTableAccessor.fullScanRegions(connection, v);
//regionToRegionServerMap = regions;
LOG.info("Finished to scan the hbase:meta for the current region assignment" +
"snapshot");
} }
private void addRegion(RegionInfo regionInfo) { private void addRegion(RegionInfo regionInfo) {

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -31,7 +33,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -39,8 +40,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.util.Triple;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -54,7 +55,7 @@ import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@Category({MasterTests.class, LargeTests.class}) @Category({ MasterTests.class, MediumTests.class })
public class TestFavoredNodeAssignmentHelper { public class TestFavoredNodeAssignmentHelper {
@ClassRule @ClassRule
@ -71,35 +72,36 @@ public class TestFavoredNodeAssignmentHelper {
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
private static String getRack(int index) {
if (index < 10) {
return "rack1";
} else if (index < 20) {
return "rack2";
} else if (index < 30) {
return "rack3";
} else {
return RackManager.UNKNOWN_RACK;
}
}
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
// Set up some server -> rack mappings // Set up some server -> rack mappings
// Have three racks in the cluster with 10 hosts each. // Have three racks in the cluster with 10 hosts each.
when(rackManager.getRack(any(ServerName.class))).then(invocation -> {
ServerName sn = invocation.getArgument(0, ServerName.class);
try {
int i = Integer.parseInt(sn.getHostname().substring("foo".length()));
return getRack(i);
} catch (NumberFormatException e) {
return RackManager.UNKNOWN_RACK;
}
});
for (int i = 0; i < 40; i++) { for (int i = 0; i < 40; i++) {
ServerName server = ServerName.valueOf("foo" + i + ":1234", -1); ServerName server = ServerName.valueOf("foo" + i, 1234, System.currentTimeMillis());
if (i < 10) { String rack = getRack(i);
Mockito.when(rackManager.getRack(server)).thenReturn("rack1"); if (!rack.equals(RackManager.UNKNOWN_RACK)) {
if (rackToServers.get("rack1") == null) { rackToServers.computeIfAbsent(rack, k -> new ArrayList<>()).add(server);
List<ServerName> servers = new ArrayList<>();
rackToServers.put("rack1", servers);
}
rackToServers.get("rack1").add(server);
}
if (i >= 10 && i < 20) {
Mockito.when(rackManager.getRack(server)).thenReturn("rack2");
if (rackToServers.get("rack2") == null) {
List<ServerName> servers = new ArrayList<>();
rackToServers.put("rack2", servers);
}
rackToServers.get("rack2").add(server);
}
if (i >= 20 && i < 30) {
Mockito.when(rackManager.getRack(server)).thenReturn("rack3");
if (rackToServers.get("rack3") == null) {
List<ServerName> servers = new ArrayList<>();
rackToServers.put("rack3", servers);
}
rackToServers.get("rack3").add(server);
} }
servers.add(server); servers.add(server);
} }
@ -107,7 +109,7 @@ public class TestFavoredNodeAssignmentHelper {
// The tests decide which racks to work with, and how many machines to // The tests decide which racks to work with, and how many machines to
// work with from any given rack // work with from any given rack
// Return a rondom 'count' number of servers from 'rack' // Return a random 'count' number of servers from 'rack'
private static List<ServerName> getServersFromRack(Map<String, Integer> rackToServerCount) { private static List<ServerName> getServersFromRack(Map<String, Integer> rackToServerCount) {
List<ServerName> chosenServers = new ArrayList<>(); List<ServerName> chosenServers = new ArrayList<>();
for (Map.Entry<String, Integer> entry : rackToServerCount.entrySet()) { for (Map.Entry<String, Integer> entry : rackToServerCount.entrySet()) {
@ -123,11 +125,10 @@ public class TestFavoredNodeAssignmentHelper {
public void testSmallCluster() { public void testSmallCluster() {
// Test the case where we cannot assign favored nodes (because the number // Test the case where we cannot assign favored nodes (because the number
// of nodes in the cluster is too less) // of nodes in the cluster is too less)
Map<String,Integer> rackToServerCount = new HashMap<>(); Map<String, Integer> rackToServerCount = new HashMap<>();
rackToServerCount.put("rack1", 2); rackToServerCount.put("rack1", 2);
List<ServerName> servers = getServersFromRack(rackToServerCount); List<ServerName> servers = getServersFromRack(rackToServerCount);
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
new Configuration());
helper.initialize(); helper.initialize();
assertFalse(helper.canPlaceFavoredNodes()); assertFalse(helper.canPlaceFavoredNodes());
} }