HDFS-7208. NN doesn't schedule replication when a DN storage fails. Contributed by Ming Ma

This commit is contained in:
Tsz-Wo Nicholas Sze 2014-10-15 20:44:24 -07:00
parent b3056c266a
commit 41980c56d3
9 changed files with 209 additions and 12 deletions

View File

@ -959,6 +959,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7185. The active NameNode will not accept an fsimage sent from the
standby during rolling upgrade. (jing9)
HDFS-7208. NN doesn't schedule replication when a DN storage fails.
(Ming Ma via szetszwo)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -1062,6 +1062,19 @@ public class BlockManager {
}
}
/** Remove the blocks associated to the given DatanodeStorageInfo. */
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
assert namesystem.hasWriteLock();
final Iterator<? extends Block> it = storageInfo.getBlockIterator();
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
Block block = it.next();
removeStoredBlock(block, node);
invalidateBlocks.remove(node, block);
}
namesystem.checkSafeMode();
}
/**
* Adds block to list of blocks which will be invalidated on specified
* datanode and log the operation

View File

@ -722,6 +722,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
logNodeIsNotChosen(storage, "storage is read-only");
return false;
}
if (storage.getState() == State.FAILED) {
logNodeIsNotChosen(storage, "storage has failed");
return false;
}
DatanodeDescriptor node = storage.getDatanodeDescriptor();
// check if the node is (being) decommissioned
if (node.isDecommissionInProgress() || node.isDecommissioned()) {

View File

@ -21,11 +21,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
@ -222,13 +224,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
// The number of replication work pending before targets are determined
private int PendingReplicationWithoutTargets = 0;
// HB processing can use it to tell if it is the first HB since DN restarted
private boolean heartbeatedSinceRegistration = false;
/**
* DatanodeDescriptor constructor
* @param nodeID id of the data node
*/
public DatanodeDescriptor(DatanodeID nodeID) {
super(nodeID);
updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
}
/**
@ -239,7 +244,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) {
super(nodeID, networkLocation);
updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
}
@VisibleForTesting
@ -341,10 +346,48 @@ public class DatanodeDescriptor extends DatanodeInfo {
*/
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures) {
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures);
heartbeatedSinceRegistration = true;
}
/**
* process datanode heartbeat or stats initialization.
*/
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures) {
long totalCapacity = 0;
long totalRemaining = 0;
long totalBlockPoolUsed = 0;
long totalDfsUsed = 0;
Set<DatanodeStorageInfo> failedStorageInfos = null;
// Decide if we should check for any missing StorageReport and mark it as
// failed. There are different scenarios.
// 1. When DN is running, a storage failed. Given the current DN
// implementation doesn't add recovered storage back to its storage list
// until DN restart, we can assume volFailures won't decrease
// during the current DN registration session.
// When volumeFailures == this.volumeFailures, it implies there is no
// state change. No need to check for failed storage. This is an
// optimization.
// 2. After DN restarts, volFailures might not increase and it is possible
// we still have new failed storage. For example, admins reduce
// available storages in configuration. Another corner case
// is the failed volumes might change after restart; a) there
// is one good storage A, one restored good storage B, so there is
// one element in storageReports and that is A. b) A failed. c) Before
// DN sends HB to NN to indicate A has failed, DN restarts. d) After DN
// restarts, storageReports has one element which is B.
boolean checkFailedStorages = (volFailures > this.volumeFailures) ||
!heartbeatedSinceRegistration;
if (checkFailedStorages) {
LOG.info("Number of failed storage changes from "
+ this.volumeFailures + " to " + volFailures);
failedStorageInfos = new HashSet<DatanodeStorageInfo>(
storageMap.values());
}
setCacheCapacity(cacheCapacity);
setCacheUsed(cacheUsed);
@ -353,6 +396,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.volumeFailures = volFailures;
for (StorageReport report : reports) {
DatanodeStorageInfo storage = updateStorage(report.getStorage());
if (checkFailedStorages) {
failedStorageInfos.remove(storage);
}
storage.receivedHeartbeat(report);
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
@ -366,6 +413,19 @@ public class DatanodeDescriptor extends DatanodeInfo {
setRemaining(totalRemaining);
setBlockPoolUsed(totalBlockPoolUsed);
setDfsUsed(totalDfsUsed);
if (checkFailedStorages) {
updateFailedStorage(failedStorageInfos);
}
}
private void updateFailedStorage(
Set<DatanodeStorageInfo> failedStorageInfos) {
for (DatanodeStorageInfo storageInfo : failedStorageInfos) {
if (storageInfo.getState() != DatanodeStorage.State.FAILED) {
LOG.info(storageInfo + " failed.");
storageInfo.setState(DatanodeStorage.State.FAILED);
}
}
}
private static class BlockIterator implements Iterator<BlockInfo> {
@ -639,6 +699,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
for(DatanodeStorageInfo storage : getStorageInfos()) {
storage.setBlockReportCount(0);
}
heartbeatedSinceRegistration = false;
}
/**

View File

@ -183,6 +183,14 @@ public class DatanodeStorageInfo {
return this.state;
}
void setState(State state) {
this.state = state;
}
boolean areBlocksOnFailedStorage() {
return getState() == State.FAILED && numBlocks != 0;
}
String getStorageID() {
return storageID;
}

View File

@ -192,7 +192,7 @@ class HeartbeatManager implements DatanodeStatistics {
addDatanode(d);
//update its timestamp
d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
}
}
@ -242,6 +242,25 @@ class HeartbeatManager implements DatanodeStatistics {
* While removing dead datanodes, make sure that only one datanode is marked
* dead at a time within the synchronized section. Otherwise, a cascading
* effect causes more datanodes to be declared dead.
* Check if there are any failed storage and if so,
* Remove all the blocks on the storage. It also covers the following less
* common scenarios. After DatanodeStorage is marked FAILED, it is still
* possible to receive IBR for this storage.
* 1) DN could deliver IBR for failed storage due to its implementation.
* a) DN queues a pending IBR request.
* b) The storage of the block fails.
* c) DN first sends HB, NN will mark the storage FAILED.
* d) DN then sends the pending IBR request.
* 2) SBN processes block request from pendingDNMessages.
* It is possible to have messages in pendingDNMessages that refer
* to some failed storage.
* a) SBN receives a IBR and put it in pendingDNMessages.
* b) The storage of the block fails.
* c) Edit log replay get the IBR from pendingDNMessages.
* Alternatively, we can resolve these scenarios with the following approaches.
* A. Make sure DN don't deliver IBR for failed storage.
* B. Remove all blocks in PendingDataNodeMessages for the failed storage
* when we remove all blocks from BlocksMap for that storage.
*/
void heartbeatCheck() {
final DatanodeManager dm = blockManager.getDatanodeManager();
@ -254,6 +273,10 @@ class HeartbeatManager implements DatanodeStatistics {
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
// locate the first failed storage that isn't on a dead node.
DatanodeStorageInfo failedStorage = null;
// check the number of stale nodes
int numOfStaleNodes = 0;
int numOfStaleStorages = 0;
@ -271,16 +294,23 @@ class HeartbeatManager implements DatanodeStatistics {
if (storageInfo.areBlockContentsStale()) {
numOfStaleStorages++;
}
if (failedStorage == null &&
storageInfo.areBlocksOnFailedStorage() &&
d != dead) {
failedStorage = storageInfo;
}
}
}
// Set the number of stale nodes in the DatanodeManager
dm.setNumStaleNodes(numOfStaleNodes);
dm.setNumStaleStorages(numOfStaleStorages);
}
allAlive = dead == null;
if (!allAlive) {
allAlive = dead == null && failedStorage == null;
if (dead != null) {
// acquire the fsnamesystem lock, and then remove the dead node.
namesystem.writeLock();
try {
@ -294,6 +324,20 @@ class HeartbeatManager implements DatanodeStatistics {
namesystem.writeUnlock();
}
}
if (failedStorage != null) {
// acquire the fsnamesystem lock, and remove blocks on the storage.
namesystem.writeLock();
try {
if (namesystem.isInStartupSafeMode()) {
return;
}
synchronized(this) {
blockManager.removeBlocksAssociatedTo(failedStorage);
}
} finally {
namesystem.writeUnlock();
}
}
}
}

View File

@ -39,7 +39,9 @@ public class DatanodeStorage {
* property should be used for debugging purposes only.
* </p>
*/
READ_ONLY_SHARED;
READ_ONLY_SHARED,
FAILED;
}
private final String storageID;

View File

@ -214,6 +214,25 @@ public class BlockManagerTestUtil {
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
}
/**
* Call heartbeat check function of HeartbeatManager and get
* under replicated blocks count within write lock to make sure
* computeDatanodeWork doesn't interfere.
* @param namesystem the FSNamesystem
* @param bm the BlockManager to manipulate
* @return the number of under replicated blocks
*/
public static int checkHeartbeatAndGetUnderReplicatedBlocksCount(
FSNamesystem namesystem, BlockManager bm) {
namesystem.writeLock();
try {
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
return bm.getUnderReplicatedNotMissingBlocks();
} finally {
namesystem.writeUnlock();
}
}
public static DatanodeStorageInfo updateStorage(DatanodeDescriptor dn,
DatanodeStorage s) {
return dn.updateStorage(s);

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.FilenameFilter;
@ -52,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -79,6 +82,7 @@ public class TestDataNodeVolumeFailure {
File dataDir = null;
File data_fail = null;
File failedDir = null;
private FileSystem fs;
// mapping blocks to Meta files(physical files) and locs(NameNode locations)
private class BlockLocs {
@ -97,6 +101,8 @@ public class TestDataNodeVolumeFailure {
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dataDir = new File(cluster.getDataDirectory());
}
@After
@ -110,6 +116,10 @@ public class TestDataNodeVolumeFailure {
if(cluster != null) {
cluster.shutdown();
}
for (int i = 0; i < 3; i++) {
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+1)), true);
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+2)), true);
}
}
/*
@ -119,8 +129,6 @@ public class TestDataNodeVolumeFailure {
*/
@Test
public void testVolumeFailure() throws Exception {
FileSystem fs = cluster.getFileSystem();
dataDir = new File(cluster.getDataDirectory());
System.out.println("Data dir: is " + dataDir.getPath());
@ -192,6 +200,39 @@ public class TestDataNodeVolumeFailure {
" is created and replicated");
}
/**
* Test that there are under replication blocks after vol failures
*/
@Test
public void testUnderReplicationAfterVolFailure() throws Exception {
// Bring up one more datanode
cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive();
final BlockManager bm = cluster.getNamesystem().getBlockManager();
Path file1 = new Path("/test1");
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)3);
// Fail the first volume on both datanodes
File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, false));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, false));
Path file2 = new Path("/test2");
DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file2, (short)3);
// underReplicatedBlocks are due to failed volumes
int underReplicatedBlocks =
BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(
cluster.getNamesystem(), bm);
assertTrue("There is no under replicated block after volume failure",
underReplicatedBlocks > 0);
}
/**
* verifies two things:
* 1. number of locations of each block in the name node