HBASE-17281 FN should use datanode port from hdfs configuration
Signed-off-by: Francis Liu <toffer@apache.org>
This commit is contained in:
parent
c5d6e166de
commit
c6250ecc83
|
@ -32,11 +32,15 @@ import java.util.Map;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
|
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
@ -63,6 +67,11 @@ public class FavoredNodesManager {
|
||||||
|
|
||||||
private MasterServices masterServices;
|
private MasterServices masterServices;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Datanode port to be used for Favored Nodes.
|
||||||
|
*/
|
||||||
|
private int datanodeDataTransferPort;
|
||||||
|
|
||||||
public FavoredNodesManager(MasterServices masterServices) {
|
public FavoredNodesManager(MasterServices masterServices) {
|
||||||
this.masterServices = masterServices;
|
this.masterServices = masterServices;
|
||||||
this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
|
this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
|
||||||
|
@ -77,6 +86,19 @@ public class FavoredNodesManager {
|
||||||
primaryRSToRegionMap = snapshotOfRegionAssignment.getPrimaryToRegionInfoMap();
|
primaryRSToRegionMap = snapshotOfRegionAssignment.getPrimaryToRegionInfoMap();
|
||||||
secondaryRSToRegionMap = snapshotOfRegionAssignment.getSecondaryToRegionInfoMap();
|
secondaryRSToRegionMap = snapshotOfRegionAssignment.getSecondaryToRegionInfoMap();
|
||||||
teritiaryRSToRegionMap = snapshotOfRegionAssignment.getTertiaryToRegionInfoMap();
|
teritiaryRSToRegionMap = snapshotOfRegionAssignment.getTertiaryToRegionInfoMap();
|
||||||
|
datanodeDataTransferPort = getDataNodePort();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getDataNodePort() {
|
||||||
|
HdfsConfiguration.init();
|
||||||
|
|
||||||
|
Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration());
|
||||||
|
|
||||||
|
int dnPort = NetUtils.createSocketAddr(
|
||||||
|
dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
|
||||||
|
LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort);
|
||||||
|
return dnPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized List<ServerName> getFavoredNodes(HRegionInfo regionInfo) {
|
public synchronized List<ServerName> getFavoredNodes(HRegionInfo regionInfo) {
|
||||||
|
@ -91,6 +113,24 @@ public class FavoredNodesManager {
|
||||||
return !regionInfo.isSystemTable();
|
return !regionInfo.isSystemTable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This should only be used when sending FN information to the region servers. Instead of
|
||||||
|
* sending the region server port, we use the datanode port. This helps in centralizing the DN
|
||||||
|
* port logic in Master. The RS uses the port from the favored node list as hints.
|
||||||
|
*/
|
||||||
|
public synchronized List<ServerName> getFavoredNodesWithDNPort(HRegionInfo regionInfo) {
|
||||||
|
if (getFavoredNodes(regionInfo) == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<ServerName> fnWithDNPort = Lists.newArrayList();
|
||||||
|
for (ServerName sn : getFavoredNodes(regionInfo)) {
|
||||||
|
fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort,
|
||||||
|
ServerName.NON_STARTCODE));
|
||||||
|
}
|
||||||
|
return fnWithDNPort;
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized void updateFavoredNodes(Map<HRegionInfo, List<ServerName>> regionFNMap)
|
public synchronized void updateFavoredNodes(Map<HRegionInfo, List<ServerName>> regionFNMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
|
|
@ -806,7 +806,7 @@ public class AssignmentManager {
|
||||||
region, State.PENDING_OPEN, destination);
|
region, State.PENDING_OPEN, destination);
|
||||||
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
|
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
|
||||||
if (shouldAssignFavoredNodes(region)) {
|
if (shouldAssignFavoredNodes(region)) {
|
||||||
favoredNodes = server.getFavoredNodesManager().getFavoredNodes(region);
|
favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
|
||||||
}
|
}
|
||||||
regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
|
regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
|
||||||
region, favoredNodes));
|
region, favoredNodes));
|
||||||
|
@ -1114,7 +1114,7 @@ public class AssignmentManager {
|
||||||
try {
|
try {
|
||||||
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
|
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
|
||||||
if (shouldAssignFavoredNodes(region)) {
|
if (shouldAssignFavoredNodes(region)) {
|
||||||
favoredNodes = server.getFavoredNodesManager().getFavoredNodes(region);
|
favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
|
||||||
}
|
}
|
||||||
serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
|
serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
|
||||||
return; // we're done
|
return; // we're done
|
||||||
|
@ -1859,8 +1859,8 @@ public class AssignmentManager {
|
||||||
}
|
}
|
||||||
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
|
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
|
||||||
if (shouldAssignFavoredNodes(hri)) {
|
if (shouldAssignFavoredNodes(hri)) {
|
||||||
favoredNodes =
|
FavoredNodesManager fnm = ((MasterServices)server).getFavoredNodesManager();
|
||||||
((MasterServices)server).getFavoredNodesManager().getFavoredNodes(hri);
|
favoredNodes = fnm.getFavoredNodesWithDNPort(hri);
|
||||||
}
|
}
|
||||||
serverManager.sendRegionOpen(serverName, hri, favoredNodes);
|
serverManager.sendRegionOpen(serverName, hri, favoredNodes);
|
||||||
return; // we're done
|
return; // we're done
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||||
import org.apache.hadoop.hbase.master.RegionStates;
|
import org.apache.hadoop.hbase.master.RegionStates;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -86,6 +87,9 @@ public class TestTableFavoredNodes {
|
||||||
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
|
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
|
||||||
FavoredNodeLoadBalancer.class, LoadBalancer.class);
|
FavoredNodeLoadBalancer.class, LoadBalancer.class);
|
||||||
conf.set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + SLAVES);
|
conf.set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + SLAVES);
|
||||||
|
|
||||||
|
// This helps test if RS get the appropriate FN updates.
|
||||||
|
conf.set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
|
||||||
TEST_UTIL.startMiniCluster(SLAVES);
|
TEST_UTIL.startMiniCluster(SLAVES);
|
||||||
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(WAIT_TIMEOUT);
|
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(WAIT_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
@ -289,6 +293,7 @@ public class TestTableFavoredNodes {
|
||||||
snRSMap.put(rst.getMaster().getServerName(), rst.getMaster());
|
snRSMap.put(rst.getMaster().getServerName(), rst.getMaster());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int dnPort = fnm.getDataNodePort();
|
||||||
RegionLocator regionLocator = admin.getConnection().getRegionLocator(tableName);
|
RegionLocator regionLocator = admin.getConnection().getRegionLocator(tableName);
|
||||||
for (HRegionLocation regionLocation : regionLocator.getAllRegionLocations()) {
|
for (HRegionLocation regionLocation : regionLocator.getAllRegionLocations()) {
|
||||||
|
|
||||||
|
@ -313,16 +318,15 @@ public class TestTableFavoredNodes {
|
||||||
regionServer.getFavoredNodesForRegion(regionInfo.getEncodedName());
|
regionServer.getFavoredNodesForRegion(regionInfo.getEncodedName());
|
||||||
assertNotNull("RS " + regionLocation.getServerName()
|
assertNotNull("RS " + regionLocation.getServerName()
|
||||||
+ " does not have FN for region: " + regionInfo, rsFavNodes);
|
+ " does not have FN for region: " + regionInfo, rsFavNodes);
|
||||||
|
assertEquals("Incorrect FN for region:" + regionInfo.getEncodedName() + " on server:" +
|
||||||
|
regionLocation.getServerName(), FavoredNodeAssignmentHelper.FAVORED_NODES_NUM,
|
||||||
|
rsFavNodes.length);
|
||||||
|
|
||||||
List<ServerName> fnFromRS = Lists.newArrayList();
|
// 4. Does DN port match all FN node list?
|
||||||
for (InetSocketAddress addr : rsFavNodes) {
|
for (ServerName sn : fnm.getFavoredNodesWithDNPort(regionInfo)) {
|
||||||
fnFromRS.add(ServerName.valueOf(addr.getHostName(), addr.getPort(),
|
assertEquals("FN should not have startCode, fnlist:" + fnList, -1, sn.getStartcode());
|
||||||
ServerName.NON_STARTCODE));
|
assertEquals("FN port should belong to DN port, fnlist:" + fnList, dnPort, sn.getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
fnFromRS.removeAll(fnList);
|
|
||||||
assertEquals("Inconsistent FN bet RS and Master, RS diff: " + fnFromRS
|
|
||||||
+ " List on master: " + fnList, 0, fnFromRS.size());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue