HDFS-8950. NameNode refresh doesn't remove DataNodes that are no longer in the allowed list. Contributed by Daniel Templeton.
This commit is contained in:
parent
1d23e1ec07
commit
59a2072135
|
@ -47,6 +47,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
HDFS-8879. Quota by storage type usage incorrectly initialized upon namenode
|
HDFS-8879. Quota by storage type usage incorrectly initialized upon namenode
|
||||||
restart. (xyao)
|
restart. (xyao)
|
||||||
|
|
||||||
|
HDFS-8950. NameNode refresh doesn't remove DataNodes that are no longer in
|
||||||
|
the allowed list (Daniel Templeton)
|
||||||
|
|
||||||
HDFS-8995. Flaw in registration bookeeping can make DN die on reconnect.
|
HDFS-8995. Flaw in registration bookeeping can make DN die on reconnect.
|
||||||
(Kihwal Lee via yliu)
|
(Kihwal Lee via yliu)
|
||||||
|
|
||||||
|
|
|
@ -1272,11 +1272,14 @@ public class DatanodeManager {
|
||||||
for (DatanodeDescriptor dn : datanodeMap.values()) {
|
for (DatanodeDescriptor dn : datanodeMap.values()) {
|
||||||
final boolean isDead = isDatanodeDead(dn);
|
final boolean isDead = isDatanodeDead(dn);
|
||||||
final boolean isDecommissioning = dn.isDecommissionInProgress();
|
final boolean isDecommissioning = dn.isDecommissionInProgress();
|
||||||
if ((listLiveNodes && !isDead) ||
|
|
||||||
|
if (((listLiveNodes && !isDead) ||
|
||||||
(listDeadNodes && isDead) ||
|
(listDeadNodes && isDead) ||
|
||||||
(listDecommissioningNodes && isDecommissioning)) {
|
(listDecommissioningNodes && isDecommissioning)) &&
|
||||||
nodes.add(dn);
|
hostFileManager.isIncluded(dn)) {
|
||||||
|
nodes.add(dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn));
|
foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,9 +126,28 @@ class HostFileManager {
|
||||||
return !includes.isEmpty();
|
return !includes.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the includes and excludes lists from the named files. Any previous
|
||||||
|
* includes and excludes lists are discarded.
|
||||||
|
* @param includeFile the path to the new includes list
|
||||||
|
* @param excludeFile the path to the new excludes list
|
||||||
|
* @throws IOException thrown if there is a problem reading one of the files
|
||||||
|
*/
|
||||||
void refresh(String includeFile, String excludeFile) throws IOException {
|
void refresh(String includeFile, String excludeFile) throws IOException {
|
||||||
HostSet newIncludes = readFile("included", includeFile);
|
HostSet newIncludes = readFile("included", includeFile);
|
||||||
HostSet newExcludes = readFile("excluded", excludeFile);
|
HostSet newExcludes = readFile("excluded", excludeFile);
|
||||||
|
|
||||||
|
refresh(newIncludes, newExcludes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the includes and excludes lists by the new HostSet instances. The
|
||||||
|
* old instances are discarded.
|
||||||
|
* @param newIncludes the new includes list
|
||||||
|
* @param newExcludes the new excludes list
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void refresh(HostSet newIncludes, HostSet newExcludes) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
includes = newIncludes;
|
includes = newIncludes;
|
||||||
excludes = newExcludes;
|
excludes = newExcludes;
|
||||||
|
|
|
@ -810,16 +810,13 @@ public class TestDecommission {
|
||||||
}
|
}
|
||||||
assertEquals("Number of live nodes should be 0", 0, info.length);
|
assertEquals("Number of live nodes should be 0", 0, info.length);
|
||||||
|
|
||||||
// Test that non-live and bogus hostnames are considered "dead".
|
// Test that bogus hostnames are considered "dead".
|
||||||
// The dead report should have an entry for (1) the DN that is
|
// The dead report should have an entry for the bogus entry in the hosts
|
||||||
// now considered dead because it is no longer allowed to connect
|
// file. The original datanode is excluded from the report because it
|
||||||
// and (2) the bogus entry in the hosts file (these entries are
|
// is no longer in the included list.
|
||||||
// always added last)
|
|
||||||
info = client.datanodeReport(DatanodeReportType.DEAD);
|
info = client.datanodeReport(DatanodeReportType.DEAD);
|
||||||
assertEquals("There should be 2 dead nodes", 2, info.length);
|
assertEquals("There should be 1 dead node", 1, info.length);
|
||||||
DatanodeID id = cluster.getDataNodes().get(0).getDatanodeId();
|
assertEquals(bogusIp, info[0].getHostName());
|
||||||
assertEquals(id.getHostName(), info[0].getHostName());
|
|
||||||
assertEquals(bogusIp, info[1].getHostName());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -33,17 +35,22 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@ -54,6 +61,22 @@ public class TestDatanodeManager {
|
||||||
//The number of times the registration / removal of nodes should happen
|
//The number of times the registration / removal of nodes should happen
|
||||||
final int NUM_ITERATIONS = 500;
|
final int NUM_ITERATIONS = 500;
|
||||||
|
|
||||||
|
private static DatanodeManager mockDatanodeManager(
|
||||||
|
FSNamesystem fsn, Configuration conf) throws IOException {
|
||||||
|
BlockManager bm = Mockito.mock(BlockManager.class);
|
||||||
|
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
|
||||||
|
return dm;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an InetSocketAddress for a host:port string
|
||||||
|
* @param host a host identifier in host:port format
|
||||||
|
* @return a corresponding InetSocketAddress object
|
||||||
|
*/
|
||||||
|
private static InetSocketAddress entry(String host) {
|
||||||
|
return HostFileManager.parseEntry("dummy", "dummy", host);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test sends a random sequence of node registrations and node removals
|
* This test sends a random sequence of node registrations and node removals
|
||||||
* to the DatanodeManager (of nodes with different IDs and versions), and
|
* to the DatanodeManager (of nodes with different IDs and versions), and
|
||||||
|
@ -294,4 +317,89 @@ public class TestDatanodeManager {
|
||||||
assertThat(sortedLocs[sortedLocs.length-2].getAdminState(),
|
assertThat(sortedLocs[sortedLocs.length-2].getAdminState(),
|
||||||
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether removing a host from the includes list without adding it to
|
||||||
|
* the excludes list will exclude it from data node reports.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRemoveIncludedNode() throws IOException {
|
||||||
|
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||||
|
|
||||||
|
// Set the write lock so that the DatanodeManager can start
|
||||||
|
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
||||||
|
|
||||||
|
DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
|
||||||
|
HostFileManager hm = new HostFileManager();
|
||||||
|
HostFileManager.HostSet noNodes = new HostFileManager.HostSet();
|
||||||
|
HostFileManager.HostSet oneNode = new HostFileManager.HostSet();
|
||||||
|
HostFileManager.HostSet twoNodes = new HostFileManager.HostSet();
|
||||||
|
DatanodeRegistration dr1 = new DatanodeRegistration(
|
||||||
|
new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123",
|
||||||
|
12345, 12345, 12345, 12345),
|
||||||
|
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE),
|
||||||
|
new ExportedBlockKeys(), "test");
|
||||||
|
DatanodeRegistration dr2 = new DatanodeRegistration(
|
||||||
|
new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-234",
|
||||||
|
23456, 23456, 23456, 23456),
|
||||||
|
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE),
|
||||||
|
new ExportedBlockKeys(), "test");
|
||||||
|
|
||||||
|
twoNodes.add(entry("127.0.0.1:12345"));
|
||||||
|
twoNodes.add(entry("127.0.0.1:23456"));
|
||||||
|
oneNode.add(entry("127.0.0.1:23456"));
|
||||||
|
|
||||||
|
hm.refresh(twoNodes, noNodes);
|
||||||
|
Whitebox.setInternalState(dm, "hostFileManager", hm);
|
||||||
|
|
||||||
|
// Register two data nodes to simulate them coming up.
|
||||||
|
// We need to add two nodes, because if we have only one node, removing it
|
||||||
|
// will cause the includes list to be empty, which means all hosts will be
|
||||||
|
// allowed.
|
||||||
|
dm.registerDatanode(dr1);
|
||||||
|
dm.registerDatanode(dr2);
|
||||||
|
|
||||||
|
// Make sure that both nodes are reported
|
||||||
|
List<DatanodeDescriptor> both =
|
||||||
|
dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.ALL);
|
||||||
|
|
||||||
|
// Sort the list so that we know which one is which
|
||||||
|
Collections.sort(both);
|
||||||
|
|
||||||
|
Assert.assertEquals("Incorrect number of hosts reported",
|
||||||
|
2, both.size());
|
||||||
|
Assert.assertEquals("Unexpected host or host in unexpected position",
|
||||||
|
"127.0.0.1:12345", both.get(0).getInfoAddr());
|
||||||
|
Assert.assertEquals("Unexpected host or host in unexpected position",
|
||||||
|
"127.0.0.1:23456", both.get(1).getInfoAddr());
|
||||||
|
|
||||||
|
// Remove one node from includes, but do not add it to excludes.
|
||||||
|
hm.refresh(oneNode, noNodes);
|
||||||
|
|
||||||
|
// Make sure that only one node is still reported
|
||||||
|
List<DatanodeDescriptor> onlyOne =
|
||||||
|
dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.ALL);
|
||||||
|
|
||||||
|
Assert.assertEquals("Incorrect number of hosts reported",
|
||||||
|
1, onlyOne.size());
|
||||||
|
Assert.assertEquals("Unexpected host reported",
|
||||||
|
"127.0.0.1:23456", onlyOne.get(0).getInfoAddr());
|
||||||
|
|
||||||
|
// Remove all nodes from includes
|
||||||
|
hm.refresh(noNodes, noNodes);
|
||||||
|
|
||||||
|
// Check that both nodes are reported again
|
||||||
|
List<DatanodeDescriptor> bothAgain =
|
||||||
|
dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.ALL);
|
||||||
|
|
||||||
|
// Sort the list so that we know which one is which
|
||||||
|
Collections.sort(bothAgain);
|
||||||
|
|
||||||
|
Assert.assertEquals("Incorrect number of hosts reported",
|
||||||
|
2, bothAgain.size());
|
||||||
|
Assert.assertEquals("Unexpected host or host in unexpected position",
|
||||||
|
"127.0.0.1:12345", bothAgain.get(0).getInfoAddr());
|
||||||
|
Assert.assertEquals("Unexpected host or host in unexpected position",
|
||||||
|
"127.0.0.1:23456", bothAgain.get(1).getInfoAddr());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,23 +104,22 @@ public class TestHostFileManager {
|
||||||
BlockManager bm = mock(BlockManager.class);
|
BlockManager bm = mock(BlockManager.class);
|
||||||
FSNamesystem fsn = mock(FSNamesystem.class);
|
FSNamesystem fsn = mock(FSNamesystem.class);
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
HostFileManager hm = mock(HostFileManager.class);
|
HostFileManager hm = new HostFileManager();
|
||||||
HostFileManager.HostSet includedNodes = new HostFileManager.HostSet();
|
HostFileManager.HostSet includedNodes = new HostFileManager.HostSet();
|
||||||
HostFileManager.HostSet excludedNodes = new HostFileManager.HostSet();
|
HostFileManager.HostSet excludedNodes = new HostFileManager.HostSet();
|
||||||
|
|
||||||
includedNodes.add(entry("127.0.0.1:12345"));
|
includedNodes.add(entry("127.0.0.1:12345"));
|
||||||
includedNodes.add(entry("localhost:12345"));
|
includedNodes.add(entry("localhost:12345"));
|
||||||
includedNodes.add(entry("127.0.0.1:12345"));
|
includedNodes.add(entry("127.0.0.1:12345"));
|
||||||
|
|
||||||
includedNodes.add(entry("127.0.0.2"));
|
includedNodes.add(entry("127.0.0.2"));
|
||||||
|
|
||||||
excludedNodes.add(entry("127.0.0.1:12346"));
|
excludedNodes.add(entry("127.0.0.1:12346"));
|
||||||
excludedNodes.add(entry("127.0.30.1:12346"));
|
excludedNodes.add(entry("127.0.30.1:12346"));
|
||||||
|
|
||||||
Assert.assertEquals(2, includedNodes.size());
|
Assert.assertEquals(2, includedNodes.size());
|
||||||
Assert.assertEquals(2, excludedNodes.size());
|
Assert.assertEquals(2, excludedNodes.size());
|
||||||
|
|
||||||
doReturn(includedNodes).when(hm).getIncludes();
|
hm.refresh(includedNodes, excludedNodes);
|
||||||
doReturn(excludedNodes).when(hm).getExcludes();
|
|
||||||
|
|
||||||
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
|
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
|
||||||
Whitebox.setInternalState(dm, "hostFileManager", hm);
|
Whitebox.setInternalState(dm, "hostFileManager", hm);
|
||||||
|
|
Loading…
Reference in New Issue