HDFS-11499. Decommissioning stuck because of failing recovery. Contributed by Lukas Majercak and Manoj Govindassamy.

This commit is contained in:
Masatake Iwasaki 2017-03-09 13:30:33 +09:00
parent 570827a819
commit 385d2cb777
3 changed files with 108 additions and 1 deletions

View File

@ -891,7 +891,15 @@ public class BlockManager implements BlockStatsMXBean {
lastBlock.getUnderConstructionFeature()
.updateStorageScheduledSize((BlockInfoStriped) lastBlock);
}
if (hasMinStorage(lastBlock)) {
// Count replicas on decommissioning nodes, as these will not be
// decommissioned unless recovery/completing last block has finished
NumberReplicas numReplicas = countNodes(lastBlock);
int numUsableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissioning() +
numReplicas.liveEnteringMaintenanceReplicas();
if (hasMinStorage(lastBlock, numUsableReplicas)) {
if (committed) {
addExpectedReplicasToPending(lastBlock);
}

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -647,6 +648,53 @@ public class TestDecommission extends AdminStatesBaseTest {
fdos.close();
}
@Test(timeout = 360000)
public void testDecommissionWithOpenFileAndBlockRecovery()
throws IOException, InterruptedException {
startCluster(1, 6);
getCluster().waitActive();
Path file = new Path("/testRecoveryDecommission");
// Create a file and never close the output stream to trigger recovery
DistributedFileSystem dfs = getCluster().getFileSystem();
FSDataOutputStream out = dfs.create(file, true,
getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) 3, blockSize);
// Write data to the file
long writtenBytes = 0;
while (writtenBytes < fileSize) {
out.writeLong(writtenBytes);
writtenBytes += 8;
}
out.hsync();
DatanodeInfo[] lastBlockLocations = NameNodeAdapter.getBlockLocations(
getCluster().getNameNode(), "/testRecoveryDecommission", 0, fileSize)
.getLastLocatedBlock().getLocations();
// Decommission all nodes of the last block
ArrayList<String> toDecom = new ArrayList<>();
for (DatanodeInfo dnDecom : lastBlockLocations) {
toDecom.add(dnDecom.getXferAddr());
}
initExcludeHosts(toDecom);
refreshNodes(0);
// Make sure hard lease expires to trigger replica recovery
getCluster().setLeasePeriod(300L, 300L);
Thread.sleep(2 * BLOCKREPORT_INTERVAL_MSEC);
for (DatanodeInfo dnDecom : lastBlockLocations) {
DatanodeInfo datanode = NameNodeAdapter.getDatanode(
getCluster().getNamesystem(), dnDecom);
waitNodeState(datanode, AdminStates.DECOMMISSIONED);
}
assertEquals(dfs.getFileStatus(file).getLen(), writtenBytes);
}
/**
* Tests restart of namenode while datanode hosts are added to exclude file
**/

View File

@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +43,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.Time;
import org.junit.Test;
@ -940,6 +944,53 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
cleanupFile(fileSys, file);
}
@Test(timeout = 120000)
public void testFileCloseAfterEnteringMaintenance() throws Exception {
LOG.info("Starting testFileCloseAfterEnteringMaintenance");
int expirationInMs = 30 * 1000;
int numDataNodes = 3;
int numNameNodes = 1;
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
startCluster(numNameNodes, numDataNodes);
getCluster().waitActive();
FSNamesystem fsn = getCluster().getNameNode().getNamesystem();
List<String> hosts = new ArrayList<>();
for (DataNode dn : getCluster().getDataNodes()) {
hosts.add(dn.getDisplayName());
putNodeInService(0, dn.getDatanodeUuid());
}
assertEquals(numDataNodes, fsn.getNumLiveDataNodes());
Path openFile = new Path("/testClosingFileInMaintenance.dat");
// Lets write 2 blocks of data to the openFile
writeFile(getCluster().getFileSystem(), openFile, (short) 3);
// Lets write some more data and keep the file open
FSDataOutputStream fsDataOutputStream = getCluster().getFileSystem()
.append(openFile);
byte[] bytes = new byte[1024];
fsDataOutputStream.write(bytes);
fsDataOutputStream.hsync();
LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
getCluster().getNameNode(0), openFile.toString(), 0, 3 * blockSize);
DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations();
// Request maintenance for DataNodes 1 and 2 which has the last block.
takeNodeOutofService(0,
Lists.newArrayList(dnInfos4LastBlock[0].getDatanodeUuid(),
dnInfos4LastBlock[1].getDatanodeUuid()),
Time.now() + expirationInMs,
null, null, AdminStates.ENTERING_MAINTENANCE);
// Closing the file should succeed even when the
// last blocks' nodes are entering maintenance.
fsDataOutputStream.close();
cleanupFile(getCluster().getFileSystem(), openFile);
}
static String getFirstBlockFirstReplicaUuid(FileSystem fileSys,
Path name) throws IOException {
DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, name);