HDFS-6791. A block could remain under replicated if all of its replicas are on decommissioned nodes. Contributed by Ming Ma.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1616306 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9ebeac16eb
commit
efc73a0f13
|
@ -449,6 +449,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6790. DFSUtil Should Use configuration.getPassword for SSL passwords
|
HDFS-6790. DFSUtil Should Use configuration.getPassword for SSL passwords
|
||||||
(Larry McCay via brandonli)
|
(Larry McCay via brandonli)
|
||||||
|
|
||||||
|
HDFS-6791. A block could remain under replicated if all of its replicas are on
|
||||||
|
decommissioned nodes. (Ming Ma via jing9)
|
||||||
|
|
||||||
Release 2.5.0 - UNRELEASED
|
Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -3174,6 +3174,15 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!status && !srcNode.isAlive) {
|
||||||
|
LOG.warn("srcNode " + srcNode + " is dead " +
|
||||||
|
"when decommission is in progress. Continue to mark " +
|
||||||
|
"it as decommission in progress. In that way, when it rejoins the " +
|
||||||
|
"cluster it can continue the decommission process.");
|
||||||
|
status = true;
|
||||||
|
}
|
||||||
|
|
||||||
srcNode.decommissioningStatus.set(underReplicatedBlocks,
|
srcNode.decommissioningStatus.set(underReplicatedBlocks,
|
||||||
decommissionOnlyReplicas,
|
decommissionOnlyReplicas,
|
||||||
underReplicatedInOpenFiles);
|
underReplicatedInOpenFiles);
|
||||||
|
|
|
@ -268,4 +268,14 @@ public class BlockManagerTestUtil {
|
||||||
}
|
}
|
||||||
return reports.toArray(StorageReport.EMPTY_ARRAY);
|
return reports.toArray(StorageReport.EMPTY_ARRAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Have DatanodeManager check decommission state.
|
||||||
|
* @param dm the DatanodeManager to manipulate
|
||||||
|
*/
|
||||||
|
public static void checkDecommissionState(DatanodeManager dm,
|
||||||
|
DatanodeDescriptor node) {
|
||||||
|
dm.checkDecommissionState(node);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,19 +31,24 @@ import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
|
@ -89,6 +94,8 @@ public class TestDecommissioningStatus {
|
||||||
4);
|
4);
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
|
||||||
|
|
||||||
writeConfigFile(localFileSys, excludeFile, null);
|
writeConfigFile(localFileSys, excludeFile, null);
|
||||||
writeConfigFile(localFileSys, includeFile, null);
|
writeConfigFile(localFileSys, includeFile, null);
|
||||||
|
|
||||||
|
@ -99,6 +106,7 @@ public class TestDecommissioningStatus {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
|
if (localFileSys != null ) cleanupFile(localFileSys, dir);
|
||||||
if(fileSys != null) fileSys.close();
|
if(fileSys != null) fileSys.close();
|
||||||
if(cluster != null) cluster.shutdown();
|
if(cluster != null) cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -138,7 +146,8 @@ public class TestDecommissioningStatus {
|
||||||
return stm;
|
return stm;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
|
static private void cleanupFile(FileSystem fileSys, Path name)
|
||||||
|
throws IOException {
|
||||||
assertTrue(fileSys.exists(name));
|
assertTrue(fileSys.exists(name));
|
||||||
fileSys.delete(name, true);
|
fileSys.delete(name, true);
|
||||||
assertTrue(!fileSys.exists(name));
|
assertTrue(!fileSys.exists(name));
|
||||||
|
@ -147,19 +156,26 @@ public class TestDecommissioningStatus {
|
||||||
/*
|
/*
|
||||||
* Decommissions the node at the given index
|
* Decommissions the node at the given index
|
||||||
*/
|
*/
|
||||||
private String decommissionNode(FSNamesystem namesystem,
|
private String decommissionNode(FSNamesystem namesystem, DFSClient client,
|
||||||
DFSClient client, FileSystem localFileSys, int nodeIndex)
|
FileSystem localFileSys, int nodeIndex) throws IOException {
|
||||||
throws IOException {
|
|
||||||
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
|
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
|
||||||
|
|
||||||
String nodename = info[nodeIndex].getXferAddr();
|
String nodename = info[nodeIndex].getXferAddr();
|
||||||
System.out.println("Decommissioning node: " + nodename);
|
decommissionNode(namesystem, localFileSys, nodename);
|
||||||
|
return nodename;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Decommissions the node by name
|
||||||
|
*/
|
||||||
|
private void decommissionNode(FSNamesystem namesystem,
|
||||||
|
FileSystem localFileSys, String dnName) throws IOException {
|
||||||
|
System.out.println("Decommissioning node: " + dnName);
|
||||||
|
|
||||||
// write nodename into the exclude file.
|
// write nodename into the exclude file.
|
||||||
ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
|
ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
|
||||||
nodes.add(nodename);
|
nodes.add(dnName);
|
||||||
writeConfigFile(localFileSys, excludeFile, nodes);
|
writeConfigFile(localFileSys, excludeFile, nodes);
|
||||||
return nodename;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkDecommissionStatus(DatanodeDescriptor decommNode,
|
private void checkDecommissionStatus(DatanodeDescriptor decommNode,
|
||||||
|
@ -276,6 +292,69 @@ public class TestDecommissioningStatus {
|
||||||
st1.close();
|
st1.close();
|
||||||
cleanupFile(fileSys, file1);
|
cleanupFile(fileSys, file1);
|
||||||
cleanupFile(fileSys, file2);
|
cleanupFile(fileSys, file2);
|
||||||
cleanupFile(localFileSys, dir);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
|
||||||
|
* as dead before decommission has completed. That will allow DN to resume
|
||||||
|
* the replication process after it rejoins the cluster.
|
||||||
|
*/
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testDecommissionStatusAfterDNRestart()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
DistributedFileSystem fileSys =
|
||||||
|
(DistributedFileSystem)cluster.getFileSystem();
|
||||||
|
|
||||||
|
// Create a file with one block. That block has one replica.
|
||||||
|
Path f = new Path("decommission.dat");
|
||||||
|
DFSTestUtil.createFile(fileSys, f, fileSize, fileSize, fileSize,
|
||||||
|
(short)1, seed);
|
||||||
|
|
||||||
|
// Find the DN that owns the only replica.
|
||||||
|
RemoteIterator<LocatedFileStatus> fileList = fileSys.listLocatedStatus(f);
|
||||||
|
BlockLocation[] blockLocations = fileList.next().getBlockLocations();
|
||||||
|
String dnName = blockLocations[0].getNames()[0];
|
||||||
|
|
||||||
|
// Decommission the DN.
|
||||||
|
FSNamesystem fsn = cluster.getNamesystem();
|
||||||
|
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
||||||
|
decommissionNode(fsn, localFileSys, dnName);
|
||||||
|
dm.refreshNodes(conf);
|
||||||
|
|
||||||
|
// Stop the DN when decommission is in progress.
|
||||||
|
// Given DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY is to 1 and the size of
|
||||||
|
// the block, it will take much longer time that test timeout value for
|
||||||
|
// the decommission to complete. So when stopDataNode is called,
|
||||||
|
// decommission should be in progress.
|
||||||
|
DataNodeProperties dataNodeProperties = cluster.stopDataNode(dnName);
|
||||||
|
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
||||||
|
while (true) {
|
||||||
|
dm.fetchDatanodes(null, dead, false);
|
||||||
|
if (dead.size() == 1) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force removal of the dead node's blocks.
|
||||||
|
BlockManagerTestUtil.checkHeartbeat(fsn.getBlockManager());
|
||||||
|
|
||||||
|
// Force DatanodeManager to check decommission state.
|
||||||
|
BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
|
||||||
|
|
||||||
|
// Verify that the DN remains in DECOMMISSION_INPROGRESS state.
|
||||||
|
assertTrue("the node is in decommissioned state ",
|
||||||
|
!dead.get(0).isDecommissioned());
|
||||||
|
|
||||||
|
// Add the node back
|
||||||
|
cluster.restartDataNode(dataNodeProperties, true);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Call refreshNodes on FSNamesystem with empty exclude file.
|
||||||
|
// This will remove the datanodes from decommissioning list and
|
||||||
|
// make them available again.
|
||||||
|
writeConfigFile(localFileSys, excludeFile, null);
|
||||||
|
dm.refreshNodes(conf);
|
||||||
|
cleanupFile(fileSys, f);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue