HDFS-9004. Add upgrade domain to DatanodeInfo. Contributed by Ming Ma (via Lei (Eddy) Xu).
Change-Id: I887c66578eebd61acc34b94f18da6e6851c609f4
(cherry picked from commit 3a9c7076e8
)
This commit is contained in:
parent
a38dceda66
commit
0095936a6a
|
@ -53,7 +53,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
private String location = NetworkTopology.DEFAULT_RACK;
|
private String location = NetworkTopology.DEFAULT_RACK;
|
||||||
private String softwareVersion;
|
private String softwareVersion;
|
||||||
private List<String> dependentHostNames = new LinkedList<String>();
|
private List<String> dependentHostNames = new LinkedList<String>();
|
||||||
|
private String upgradeDomain;
|
||||||
|
|
||||||
// Datanode administrative states
|
// Datanode administrative states
|
||||||
public enum AdminStates {
|
public enum AdminStates {
|
||||||
|
@ -95,6 +95,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
this.xceiverCount = from.getXceiverCount();
|
this.xceiverCount = from.getXceiverCount();
|
||||||
this.location = from.getNetworkLocation();
|
this.location = from.getNetworkLocation();
|
||||||
this.adminState = from.getAdminState();
|
this.adminState = from.getAdminState();
|
||||||
|
this.upgradeDomain = from.getUpgradeDomain();
|
||||||
}
|
}
|
||||||
|
|
||||||
public DatanodeInfo(DatanodeID nodeID) {
|
public DatanodeInfo(DatanodeID nodeID) {
|
||||||
|
@ -120,12 +121,13 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
final long capacity, final long dfsUsed, final long remaining,
|
final long capacity, final long dfsUsed, final long remaining,
|
||||||
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
||||||
final long lastUpdate, final long lastUpdateMonotonic,
|
final long lastUpdate, final long lastUpdateMonotonic,
|
||||||
final int xceiverCount, final AdminStates adminState) {
|
final int xceiverCount, final AdminStates adminState,
|
||||||
|
final String upgradeDomain) {
|
||||||
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getDatanodeUuid(),
|
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getDatanodeUuid(),
|
||||||
nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
|
nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
|
||||||
nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
|
nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
|
||||||
cacheCapacity, cacheUsed, lastUpdate, lastUpdateMonotonic,
|
cacheCapacity, cacheUsed, lastUpdate, lastUpdateMonotonic,
|
||||||
xceiverCount, location, adminState);
|
xceiverCount, location, adminState, upgradeDomain);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Constructor */
|
/** Constructor */
|
||||||
|
@ -137,6 +139,22 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
final long lastUpdate, final long lastUpdateMonotonic,
|
final long lastUpdate, final long lastUpdateMonotonic,
|
||||||
final int xceiverCount, final String networkLocation,
|
final int xceiverCount, final String networkLocation,
|
||||||
final AdminStates adminState) {
|
final AdminStates adminState) {
|
||||||
|
this(ipAddr, hostName, datanodeUuid, xferPort, infoPort, infoSecurePort,
|
||||||
|
ipcPort, capacity, dfsUsed, remaining, blockPoolUsed, cacheCapacity,
|
||||||
|
cacheUsed, lastUpdate, lastUpdateMonotonic, xceiverCount,
|
||||||
|
networkLocation, adminState, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Constructor */
|
||||||
|
public DatanodeInfo(final String ipAddr, final String hostName,
|
||||||
|
final String datanodeUuid, final int xferPort, final int infoPort,
|
||||||
|
final int infoSecurePort, final int ipcPort,
|
||||||
|
final long capacity, final long dfsUsed, final long remaining,
|
||||||
|
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
||||||
|
final long lastUpdate, final long lastUpdateMonotonic,
|
||||||
|
final int xceiverCount, final String networkLocation,
|
||||||
|
final AdminStates adminState,
|
||||||
|
final String upgradeDomain) {
|
||||||
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
|
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
|
||||||
infoSecurePort, ipcPort);
|
infoSecurePort, ipcPort);
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
|
@ -150,6 +168,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
this.xceiverCount = xceiverCount;
|
this.xceiverCount = xceiverCount;
|
||||||
this.location = networkLocation;
|
this.location = networkLocation;
|
||||||
this.adminState = adminState;
|
this.adminState = adminState;
|
||||||
|
this.upgradeDomain = upgradeDomain;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Network location name */
|
/** Network location name */
|
||||||
|
@ -298,6 +317,16 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
this.location = NodeBase.normalize(location);
|
this.location = NodeBase.normalize(location);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Sets the upgrade domain */
|
||||||
|
public void setUpgradeDomain(String upgradeDomain) {
|
||||||
|
this.upgradeDomain = upgradeDomain;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** upgrade domain */
|
||||||
|
public String getUpgradeDomain() {
|
||||||
|
return upgradeDomain;
|
||||||
|
}
|
||||||
|
|
||||||
/** Add a hostname to a list of network dependencies */
|
/** Add a hostname to a list of network dependencies */
|
||||||
public void addDependentHostName(String hostname) {
|
public void addDependentHostName(String hostname) {
|
||||||
dependentHostNames.add(hostname);
|
dependentHostNames.add(hostname);
|
||||||
|
@ -339,6 +368,9 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
||||||
buffer.append("Rack: "+location+"\n");
|
buffer.append("Rack: "+location+"\n");
|
||||||
}
|
}
|
||||||
|
if (upgradeDomain != null) {
|
||||||
|
buffer.append("Upgrade domain: "+ upgradeDomain +"\n");
|
||||||
|
}
|
||||||
buffer.append("Decommission Status : ");
|
buffer.append("Decommission Status : ");
|
||||||
if (isDecommissioned()) {
|
if (isDecommissioned()) {
|
||||||
buffer.append("Decommissioned\n");
|
buffer.append("Decommissioned\n");
|
||||||
|
@ -378,6 +410,9 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
||||||
buffer.append(" "+location);
|
buffer.append(" "+location);
|
||||||
}
|
}
|
||||||
|
if (upgradeDomain != null) {
|
||||||
|
buffer.append(" " + upgradeDomain);
|
||||||
|
}
|
||||||
if (isDecommissioned()) {
|
if (isDecommissioned()) {
|
||||||
buffer.append(" DD");
|
buffer.append(" DD");
|
||||||
} else if (isDecommissionInProgress()) {
|
} else if (isDecommissionInProgress()) {
|
||||||
|
|
|
@ -140,6 +140,9 @@ public class PBHelperClient {
|
||||||
if (info.getNetworkLocation() != null) {
|
if (info.getNetworkLocation() != null) {
|
||||||
builder.setLocation(info.getNetworkLocation());
|
builder.setLocation(info.getNetworkLocation());
|
||||||
}
|
}
|
||||||
|
if (info.getUpgradeDomain() != null) {
|
||||||
|
builder.setUpgradeDomain(info.getUpgradeDomain());
|
||||||
|
}
|
||||||
builder
|
builder
|
||||||
.setId(convert((DatanodeID) info))
|
.setId(convert((DatanodeID) info))
|
||||||
.setCapacity(info.getCapacity())
|
.setCapacity(info.getCapacity())
|
||||||
|
|
|
@ -241,7 +241,8 @@ class JsonUtilClient {
|
||||||
getLong(m, "lastUpdateMonotonic", 0l),
|
getLong(m, "lastUpdateMonotonic", 0l),
|
||||||
getInt(m, "xceiverCount", 0),
|
getInt(m, "xceiverCount", 0),
|
||||||
getString(m, "networkLocation", ""),
|
getString(m, "networkLocation", ""),
|
||||||
DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
|
DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")),
|
||||||
|
getString(m, "upgradeDomain", ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert an Object[] to a DatanodeInfo[]. */
|
/** Convert an Object[] to a DatanodeInfo[]. */
|
||||||
|
|
|
@ -98,6 +98,7 @@ message DatanodeInfoProto {
|
||||||
optional uint64 cacheCapacity = 11 [default = 0];
|
optional uint64 cacheCapacity = 11 [default = 0];
|
||||||
optional uint64 cacheUsed = 12 [default = 0];
|
optional uint64 cacheUsed = 12 [default = 0];
|
||||||
optional uint64 lastUpdateMonotonic = 13 [default = 0];
|
optional uint64 lastUpdateMonotonic = 13 [default = 0];
|
||||||
|
optional string upgradeDomain = 14;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -595,7 +595,8 @@ public class PBHelper {
|
||||||
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
|
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
|
||||||
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
|
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
|
||||||
di.getLastUpdate(), di.getLastUpdateMonotonic(),
|
di.getLastUpdate(), di.getLastUpdateMonotonic(),
|
||||||
di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
|
di.getXceiverCount(), PBHelper.convert(di.getAdminState()),
|
||||||
|
di.hasUpgradeDomain() ? di.getUpgradeDomain() : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
|
static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
|
||||||
|
|
|
@ -5917,6 +5917,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
.put("estimatedCapacityLostTotal",
|
.put("estimatedCapacityLostTotal",
|
||||||
volumeFailureSummary.getEstimatedCapacityLostTotal());
|
volumeFailureSummary.getEstimatedCapacityLostTotal());
|
||||||
}
|
}
|
||||||
|
if (node.getUpgradeDomain() != null) {
|
||||||
|
innerinfo.put("upgradeDomain", node.getUpgradeDomain());
|
||||||
|
}
|
||||||
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo.build());
|
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo.build());
|
||||||
}
|
}
|
||||||
return JSON.toString(info);
|
return JSON.toString(info);
|
||||||
|
|
|
@ -169,6 +169,9 @@ public class JsonUtil {
|
||||||
m.put("xceiverCount", datanodeinfo.getXceiverCount());
|
m.put("xceiverCount", datanodeinfo.getXceiverCount());
|
||||||
m.put("networkLocation", datanodeinfo.getNetworkLocation());
|
m.put("networkLocation", datanodeinfo.getNetworkLocation());
|
||||||
m.put("adminState", datanodeinfo.getAdminState().name());
|
m.put("adminState", datanodeinfo.getAdminState().name());
|
||||||
|
if (datanodeinfo.getUpgradeDomain() != null) {
|
||||||
|
m.put("upgradeDomain", datanodeinfo.getUpgradeDomain());
|
||||||
|
}
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
|
@ -76,6 +78,15 @@ public class TestNameNodeMXBean {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Set upgrade domain on the first DN.
|
||||||
|
String upgradeDomain = "abcd";
|
||||||
|
DatanodeManager dm = cluster.getNameNode().getNamesystem().
|
||||||
|
getBlockManager().getDatanodeManager();
|
||||||
|
DatanodeDescriptor dd = dm.getDatanode(
|
||||||
|
cluster.getDataNodes().get(0).getDatanodeId());
|
||||||
|
dd.setUpgradeDomain(upgradeDomain);
|
||||||
|
String dnXferAddrWithUpgradeDomainSet = dd.getXferAddr();
|
||||||
|
|
||||||
FSNamesystem fsn = cluster.getNameNode().namesystem;
|
FSNamesystem fsn = cluster.getNameNode().namesystem;
|
||||||
|
|
||||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
@ -125,6 +136,15 @@ public class TestNameNodeMXBean {
|
||||||
assertTrue(((Long)liveNode.get("capacity")) > 0);
|
assertTrue(((Long)liveNode.get("capacity")) > 0);
|
||||||
assertTrue(liveNode.containsKey("numBlocks"));
|
assertTrue(liveNode.containsKey("numBlocks"));
|
||||||
assertTrue(((Long)liveNode.get("numBlocks")) == 0);
|
assertTrue(((Long)liveNode.get("numBlocks")) == 0);
|
||||||
|
// a. By default the upgrade domain isn't defined on any DN.
|
||||||
|
// b. If the upgrade domain is set on a DN, JMX should have the same
|
||||||
|
// value.
|
||||||
|
String xferAddr = (String)liveNode.get("xferaddr");
|
||||||
|
if (!xferAddr.equals(dnXferAddrWithUpgradeDomainSet)) {
|
||||||
|
assertTrue(!liveNode.containsKey("upgradeDomain"));
|
||||||
|
} else {
|
||||||
|
assertTrue(liveNode.get("upgradeDomain").equals(upgradeDomain));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
assertEquals(fsn.getLiveNodes(), alivenodeinfo);
|
assertEquals(fsn.getLiveNodes(), alivenodeinfo);
|
||||||
// get attribute deadnodeinfo
|
// get attribute deadnodeinfo
|
||||||
|
|
Loading…
Reference in New Issue