HDFS-16540. Data locality is lost when DataNode pod restarts in kubernetes (#4170)
This reverts the previous commit 4e47eb66d1
undone so I could reapply with the '.' after the HDFS-16540 as is done
in all other commits.
This commit is contained in:
parent
4e47eb66d1
commit
9ed8d60511
Binary file not shown.
|
@ -1171,6 +1171,7 @@ public class DatanodeManager {
|
||||||
nodeN = null;
|
nodeN = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean updateHost2DatanodeMap = false;
|
||||||
if (nodeS != null) {
|
if (nodeS != null) {
|
||||||
if (nodeN == nodeS) {
|
if (nodeN == nodeS) {
|
||||||
// The same datanode has been just restarted to serve the same data
|
// The same datanode has been just restarted to serve the same data
|
||||||
|
@ -1189,7 +1190,11 @@ public class DatanodeManager {
|
||||||
nodes with its data cleared (or user can just remove the StorageID
|
nodes with its data cleared (or user can just remove the StorageID
|
||||||
value in "VERSION" file under the data directory of the datanode,
|
value in "VERSION" file under the data directory of the datanode,
|
||||||
but this is might not work if VERSION file format has changed
|
but this is might not work if VERSION file format has changed
|
||||||
*/
|
*/
|
||||||
|
// Check if nodeS's host information is same as nodeReg's, if not,
|
||||||
|
// it needs to update host2DatanodeMap accordringly.
|
||||||
|
updateHost2DatanodeMap = !nodeS.getXferAddr().equals(nodeReg.getXferAddr());
|
||||||
|
|
||||||
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
|
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
|
||||||
+ " is replaced by " + nodeReg + " with the same storageID "
|
+ " is replaced by " + nodeReg + " with the same storageID "
|
||||||
+ nodeReg.getDatanodeUuid());
|
+ nodeReg.getDatanodeUuid());
|
||||||
|
@ -1199,6 +1204,11 @@ public class DatanodeManager {
|
||||||
try {
|
try {
|
||||||
// update cluster map
|
// update cluster map
|
||||||
getNetworkTopology().remove(nodeS);
|
getNetworkTopology().remove(nodeS);
|
||||||
|
|
||||||
|
// Update Host2DatanodeMap
|
||||||
|
if (updateHost2DatanodeMap) {
|
||||||
|
getHost2DatanodeMap().remove(nodeS);
|
||||||
|
}
|
||||||
if(shouldCountVersion(nodeS)) {
|
if(shouldCountVersion(nodeS)) {
|
||||||
decrementVersionCount(nodeS.getSoftwareVersion());
|
decrementVersionCount(nodeS.getSoftwareVersion());
|
||||||
}
|
}
|
||||||
|
@ -1217,6 +1227,11 @@ public class DatanodeManager {
|
||||||
nodeS.setDependentHostNames(
|
nodeS.setDependentHostNames(
|
||||||
getNetworkDependenciesWithDefault(nodeS));
|
getNetworkDependenciesWithDefault(nodeS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (updateHost2DatanodeMap) {
|
||||||
|
getHost2DatanodeMap().add(nodeS);
|
||||||
|
}
|
||||||
|
|
||||||
getNetworkTopology().add(nodeS);
|
getNetworkTopology().add(nodeS);
|
||||||
resolveUpgradeDomain(nodeS);
|
resolveUpgradeDomain(nodeS);
|
||||||
|
|
||||||
|
|
|
@ -138,6 +138,35 @@ public class TestDatanodeManager {
|
||||||
mapToCheck.get("version1").intValue(), 1);
|
mapToCheck.get("version1").intValue(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test checks that if a node is re-registered with a different ip, its
|
||||||
|
* host2DatanodeMap is correctly updated with the new ip.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testHost2NodeMapCorrectAfterReregister()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
//Create the DatanodeManager which will be tested
|
||||||
|
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||||
|
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
DatanodeManager dm = mockDatanodeManager(fsn, conf);
|
||||||
|
|
||||||
|
String storageID = "someStorageID1";
|
||||||
|
String ipOld = "someIPOld" + storageID;
|
||||||
|
String ipNew = "someIPNew" + storageID;
|
||||||
|
|
||||||
|
dm.registerDatanode(new DatanodeRegistration(
|
||||||
|
new DatanodeID(ipOld, "", storageID, 9000, 0, 0, 0),
|
||||||
|
null, null, "version"));
|
||||||
|
|
||||||
|
dm.registerDatanode(new DatanodeRegistration(
|
||||||
|
new DatanodeID(ipNew, "", storageID, 9000, 0, 0, 0),
|
||||||
|
null, null, "version"));
|
||||||
|
|
||||||
|
assertNull("should be no node with old ip", dm.getDatanodeByHost(ipOld));
|
||||||
|
assertNotNull("should be a node with new ip", dm.getDatanodeByHost(ipNew));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test sends a random sequence of node registrations and node removals
|
* This test sends a random sequence of node registrations and node removals
|
||||||
* to the DatanodeManager (of nodes with different IDs and versions), and
|
* to the DatanodeManager (of nodes with different IDs and versions), and
|
||||||
|
|
Loading…
Reference in New Issue