HDFS-6791. Merge change r1616306 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616311 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-08-06 19:15:04 +00:00
parent 72d90780c2
commit 5c7571fa3f
4 changed files with 110 additions and 8 deletions

View File

@ -194,6 +194,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6790. DFSUtil Should Use configuration.getPassword for SSL passwords HDFS-6790. DFSUtil Should Use configuration.getPassword for SSL passwords
(Larry McCay via brandonli) (Larry McCay via brandonli)
HDFS-6791. A block could remain under replicated if all of its replicas are on
decommissioned nodes. (Ming Ma via jing9)
Release 2.5.0 - UNRELEASED Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -3177,6 +3177,15 @@ public class BlockManager {
} }
} }
} }
if (!status && !srcNode.isAlive) {
LOG.warn("srcNode " + srcNode + " is dead " +
"when decommission is in progress. Continue to mark " +
"it as decommission in progress. In that way, when it rejoins the " +
"cluster it can continue the decommission process.");
status = true;
}
srcNode.decommissioningStatus.set(underReplicatedBlocks, srcNode.decommissioningStatus.set(underReplicatedBlocks,
decommissionOnlyReplicas, decommissionOnlyReplicas,
underReplicatedInOpenFiles); underReplicatedInOpenFiles);

View File

@ -268,4 +268,14 @@ public class BlockManagerTestUtil {
} }
return reports.toArray(StorageReport.EMPTY_ARRAY); return reports.toArray(StorageReport.EMPTY_ARRAY);
} }
/**
* Have DatanodeManager check decommission state.
* @param dm the DatanodeManager to manipulate
*/
public static void checkDecommissionState(DatanodeManager dm,
DatanodeDescriptor node) {
dm.checkDecommissionState(node);
}
} }

View File

@ -31,18 +31,24 @@ import java.util.Random;
import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
@ -88,6 +94,8 @@ public class TestDecommissioningStatus {
4); 4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
writeConfigFile(localFileSys, excludeFile, null); writeConfigFile(localFileSys, excludeFile, null);
writeConfigFile(localFileSys, includeFile, null); writeConfigFile(localFileSys, includeFile, null);
@ -98,6 +106,7 @@ public class TestDecommissioningStatus {
@AfterClass @AfterClass
public static void tearDown() throws Exception { public static void tearDown() throws Exception {
if (localFileSys != null ) cleanupFile(localFileSys, dir);
if(fileSys != null) fileSys.close(); if(fileSys != null) fileSys.close();
if(cluster != null) cluster.shutdown(); if(cluster != null) cluster.shutdown();
} }
@ -150,7 +159,8 @@ public class TestDecommissioningStatus {
return stm; return stm;
} }
private void cleanupFile(FileSystem fileSys, Path name) throws IOException { static private void cleanupFile(FileSystem fileSys, Path name)
throws IOException {
assertTrue(fileSys.exists(name)); assertTrue(fileSys.exists(name));
fileSys.delete(name, true); fileSys.delete(name, true);
assertTrue(!fileSys.exists(name)); assertTrue(!fileSys.exists(name));
@ -159,19 +169,26 @@ public class TestDecommissioningStatus {
/* /*
* Decommissions the node at the given index * Decommissions the node at the given index
*/ */
private String decommissionNode(FSNamesystem namesystem, private String decommissionNode(FSNamesystem namesystem, DFSClient client,
DFSClient client, FileSystem localFileSys, int nodeIndex) FileSystem localFileSys, int nodeIndex) throws IOException {
throws IOException {
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
String nodename = info[nodeIndex].getXferAddr(); String nodename = info[nodeIndex].getXferAddr();
System.out.println("Decommissioning node: " + nodename); decommissionNode(namesystem, localFileSys, nodename);
return nodename;
}
/*
* Decommissions the node by name
*/
private void decommissionNode(FSNamesystem namesystem,
FileSystem localFileSys, String dnName) throws IOException {
System.out.println("Decommissioning node: " + dnName);
// write nodename into the exclude file. // write nodename into the exclude file.
ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes); ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
nodes.add(nodename); nodes.add(dnName);
writeConfigFile(localFileSys, excludeFile, nodes); writeConfigFile(localFileSys, excludeFile, nodes);
return nodename;
} }
private void checkDecommissionStatus(DatanodeDescriptor decommNode, private void checkDecommissionStatus(DatanodeDescriptor decommNode,
@ -287,6 +304,69 @@ public class TestDecommissioningStatus {
st1.close(); st1.close();
cleanupFile(fileSys, file1); cleanupFile(fileSys, file1);
cleanupFile(fileSys, file2); cleanupFile(fileSys, file2);
cleanupFile(localFileSys, dir); }
/**
* Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
* as dead before decommission has completed. That will allow DN to resume
* the replication process after it rejoins the cluster.
*/
@Test(timeout=120000)
public void testDecommissionStatusAfterDNRestart()
throws IOException, InterruptedException {
DistributedFileSystem fileSys =
(DistributedFileSystem)cluster.getFileSystem();
// Create a file with one block. That block has one replica.
Path f = new Path("decommission.dat");
DFSTestUtil.createFile(fileSys, f, fileSize, fileSize, fileSize,
(short)1, seed);
// Find the DN that owns the only replica.
RemoteIterator<LocatedFileStatus> fileList = fileSys.listLocatedStatus(f);
BlockLocation[] blockLocations = fileList.next().getBlockLocations();
String dnName = blockLocations[0].getNames()[0];
// Decommission the DN.
FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
decommissionNode(fsn, localFileSys, dnName);
dm.refreshNodes(conf);
// Stop the DN when decommission is in progress.
// Given DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY is to 1 and the size of
// the block, it will take much longer time that test timeout value for
// the decommission to complete. So when stopDataNode is called,
// decommission should be in progress.
DataNodeProperties dataNodeProperties = cluster.stopDataNode(dnName);
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
while (true) {
dm.fetchDatanodes(null, dead, false);
if (dead.size() == 1) {
break;
}
Thread.sleep(1000);
}
// Force removal of the dead node's blocks.
BlockManagerTestUtil.checkHeartbeat(fsn.getBlockManager());
// Force DatanodeManager to check decommission state.
BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
// Verify that the DN remains in DECOMMISSION_INPROGRESS state.
assertTrue("the node is in decommissioned state ",
!dead.get(0).isDecommissioned());
// Add the node back
cluster.restartDataNode(dataNodeProperties, true);
cluster.waitActive();
// Call refreshNodes on FSNamesystem with empty exclude file.
// This will remove the datanodes from decommissioning list and
// make them available again.
writeConfigFile(localFileSys, excludeFile, null);
dm.refreshNodes(conf);
cleanupFile(fileSys, f);
} }
} }