HDFS-7208. NN doesn't schedule replication when a DN storage fails. Contributed by Ming Ma
This commit is contained in:
parent
76cf2250de
commit
603293da92
|
@ -607,6 +607,9 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-7185. The active NameNode will not accept an fsimage sent from the
|
||||
standby during rolling upgrade. (jing9)
|
||||
|
||||
HDFS-7208. NN doesn't schedule replication when a DN storage fails.
|
||||
(Ming Ma via szetszwo)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
||||
|
|
|
@ -1058,6 +1058,19 @@ public class BlockManager {
|
|||
}
|
||||
}
|
||||
|
||||
/** Remove the blocks associated to the given DatanodeStorageInfo. */
|
||||
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
|
||||
assert namesystem.hasWriteLock();
|
||||
final Iterator<? extends Block> it = storageInfo.getBlockIterator();
|
||||
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||
while(it.hasNext()) {
|
||||
Block block = it.next();
|
||||
removeStoredBlock(block, node);
|
||||
invalidateBlocks.remove(node, block);
|
||||
}
|
||||
namesystem.checkSafeMode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds block to list of blocks which will be invalidated on specified
|
||||
* datanode and log the operation
|
||||
|
|
|
@ -722,6 +722,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
logNodeIsNotChosen(storage, "storage is read-only");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (storage.getState() == State.FAILED) {
|
||||
logNodeIsNotChosen(storage, "storage has failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||
// check if the node is (being) decommissioned
|
||||
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
||||
|
|
|
@ -21,11 +21,13 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -224,13 +226,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
// The number of replication work pending before targets are determined
|
||||
private int PendingReplicationWithoutTargets = 0;
|
||||
|
||||
// HB processing can use it to tell if it is the first HB since DN restarted
|
||||
private boolean heartbeatedSinceRegistration = false;
|
||||
|
||||
/**
|
||||
* DatanodeDescriptor constructor
|
||||
* @param nodeID id of the data node
|
||||
*/
|
||||
public DatanodeDescriptor(DatanodeID nodeID) {
|
||||
super(nodeID);
|
||||
updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -241,7 +246,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
public DatanodeDescriptor(DatanodeID nodeID,
|
||||
String networkLocation) {
|
||||
super(nodeID, networkLocation);
|
||||
updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -343,10 +348,48 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
*/
|
||||
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
|
||||
long cacheUsed, int xceiverCount, int volFailures) {
|
||||
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
|
||||
volFailures);
|
||||
heartbeatedSinceRegistration = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* process datanode heartbeat or stats initialization.
|
||||
*/
|
||||
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
|
||||
long cacheUsed, int xceiverCount, int volFailures) {
|
||||
long totalCapacity = 0;
|
||||
long totalRemaining = 0;
|
||||
long totalBlockPoolUsed = 0;
|
||||
long totalDfsUsed = 0;
|
||||
Set<DatanodeStorageInfo> failedStorageInfos = null;
|
||||
|
||||
// Decide if we should check for any missing StorageReport and mark it as
|
||||
// failed. There are different scenarios.
|
||||
// 1. When DN is running, a storage failed. Given the current DN
|
||||
// implementation doesn't add recovered storage back to its storage list
|
||||
// until DN restart, we can assume volFailures won't decrease
|
||||
// during the current DN registration session.
|
||||
// When volumeFailures == this.volumeFailures, it implies there is no
|
||||
// state change. No need to check for failed storage. This is an
|
||||
// optimization.
|
||||
// 2. After DN restarts, volFailures might not increase and it is possible
|
||||
// we still have new failed storage. For example, admins reduce
|
||||
// available storages in configuration. Another corner case
|
||||
// is the failed volumes might change after restart; a) there
|
||||
// is one good storage A, one restored good storage B, so there is
|
||||
// one element in storageReports and that is A. b) A failed. c) Before
|
||||
// DN sends HB to NN to indicate A has failed, DN restarts. d) After DN
|
||||
// restarts, storageReports has one element which is B.
|
||||
boolean checkFailedStorages = (volFailures > this.volumeFailures) ||
|
||||
!heartbeatedSinceRegistration;
|
||||
|
||||
if (checkFailedStorages) {
|
||||
LOG.info("Number of failed storage changes from "
|
||||
+ this.volumeFailures + " to " + volFailures);
|
||||
failedStorageInfos = new HashSet<DatanodeStorageInfo>(
|
||||
storageMap.values());
|
||||
}
|
||||
|
||||
setCacheCapacity(cacheCapacity);
|
||||
setCacheUsed(cacheUsed);
|
||||
|
@ -355,6 +398,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
this.volumeFailures = volFailures;
|
||||
for (StorageReport report : reports) {
|
||||
DatanodeStorageInfo storage = updateStorage(report.getStorage());
|
||||
if (checkFailedStorages) {
|
||||
failedStorageInfos.remove(storage);
|
||||
}
|
||||
|
||||
storage.receivedHeartbeat(report);
|
||||
totalCapacity += report.getCapacity();
|
||||
totalRemaining += report.getRemaining();
|
||||
|
@ -368,6 +415,19 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
setRemaining(totalRemaining);
|
||||
setBlockPoolUsed(totalBlockPoolUsed);
|
||||
setDfsUsed(totalDfsUsed);
|
||||
if (checkFailedStorages) {
|
||||
updateFailedStorage(failedStorageInfos);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateFailedStorage(
|
||||
Set<DatanodeStorageInfo> failedStorageInfos) {
|
||||
for (DatanodeStorageInfo storageInfo : failedStorageInfos) {
|
||||
if (storageInfo.getState() != DatanodeStorage.State.FAILED) {
|
||||
LOG.info(storageInfo + " failed.");
|
||||
storageInfo.setState(DatanodeStorage.State.FAILED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class BlockIterator implements Iterator<BlockInfo> {
|
||||
|
@ -641,6 +701,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
for(DatanodeStorageInfo storage : getStorageInfos()) {
|
||||
storage.setBlockReportCount(0);
|
||||
}
|
||||
heartbeatedSinceRegistration = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -182,7 +182,15 @@ public class DatanodeStorageInfo {
|
|||
State getState() {
|
||||
return this.state;
|
||||
}
|
||||
|
||||
|
||||
void setState(State state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
boolean areBlocksOnFailedStorage() {
|
||||
return getState() == State.FAILED && numBlocks != 0;
|
||||
}
|
||||
|
||||
String getStorageID() {
|
||||
return storageID;
|
||||
}
|
||||
|
|
|
@ -192,7 +192,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
addDatanode(d);
|
||||
|
||||
//update its timestamp
|
||||
d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -242,6 +242,25 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
* While removing dead datanodes, make sure that only one datanode is marked
|
||||
* dead at a time within the synchronized section. Otherwise, a cascading
|
||||
* effect causes more datanodes to be declared dead.
|
||||
* Check if there are any failed storage and if so,
|
||||
* Remove all the blocks on the storage. It also covers the following less
|
||||
* common scenarios. After DatanodeStorage is marked FAILED, it is still
|
||||
* possible to receive IBR for this storage.
|
||||
* 1) DN could deliver IBR for failed storage due to its implementation.
|
||||
* a) DN queues a pending IBR request.
|
||||
* b) The storage of the block fails.
|
||||
* c) DN first sends HB, NN will mark the storage FAILED.
|
||||
* d) DN then sends the pending IBR request.
|
||||
* 2) SBN processes block request from pendingDNMessages.
|
||||
* It is possible to have messages in pendingDNMessages that refer
|
||||
* to some failed storage.
|
||||
* a) SBN receives a IBR and put it in pendingDNMessages.
|
||||
* b) The storage of the block fails.
|
||||
* c) Edit log replay get the IBR from pendingDNMessages.
|
||||
* Alternatively, we can resolve these scenarios with the following approaches.
|
||||
* A. Make sure DN don't deliver IBR for failed storage.
|
||||
* B. Remove all blocks in PendingDataNodeMessages for the failed storage
|
||||
* when we remove all blocks from BlocksMap for that storage.
|
||||
*/
|
||||
void heartbeatCheck() {
|
||||
final DatanodeManager dm = blockManager.getDatanodeManager();
|
||||
|
@ -254,6 +273,10 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
while (!allAlive) {
|
||||
// locate the first dead node.
|
||||
DatanodeID dead = null;
|
||||
|
||||
// locate the first failed storage that isn't on a dead node.
|
||||
DatanodeStorageInfo failedStorage = null;
|
||||
|
||||
// check the number of stale nodes
|
||||
int numOfStaleNodes = 0;
|
||||
int numOfStaleStorages = 0;
|
||||
|
@ -271,7 +294,14 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
if (storageInfo.areBlockContentsStale()) {
|
||||
numOfStaleStorages++;
|
||||
}
|
||||
|
||||
if (failedStorage == null &&
|
||||
storageInfo.areBlocksOnFailedStorage() &&
|
||||
d != dead) {
|
||||
failedStorage = storageInfo;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Set the number of stale nodes in the DatanodeManager
|
||||
|
@ -279,8 +309,8 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
dm.setNumStaleStorages(numOfStaleStorages);
|
||||
}
|
||||
|
||||
allAlive = dead == null;
|
||||
if (!allAlive) {
|
||||
allAlive = dead == null && failedStorage == null;
|
||||
if (dead != null) {
|
||||
// acquire the fsnamesystem lock, and then remove the dead node.
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
|
@ -294,6 +324,20 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
if (failedStorage != null) {
|
||||
// acquire the fsnamesystem lock, and remove blocks on the storage.
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
if (namesystem.isInStartupSafeMode()) {
|
||||
return;
|
||||
}
|
||||
synchronized(this) {
|
||||
blockManager.removeBlocksAssociatedTo(failedStorage);
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,9 @@ public class DatanodeStorage {
|
|||
* property should be used for debugging purposes only.
|
||||
* </p>
|
||||
*/
|
||||
READ_ONLY_SHARED;
|
||||
READ_ONLY_SHARED,
|
||||
|
||||
FAILED;
|
||||
}
|
||||
|
||||
private final String storageID;
|
||||
|
|
|
@ -213,7 +213,26 @@ public class BlockManagerTestUtil {
|
|||
public static void checkHeartbeat(BlockManager bm) {
|
||||
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Call heartbeat check function of HeartbeatManager and get
|
||||
* under replicated blocks count within write lock to make sure
|
||||
* computeDatanodeWork doesn't interfere.
|
||||
* @param namesystem the FSNamesystem
|
||||
* @param bm the BlockManager to manipulate
|
||||
* @return the number of under replicated blocks
|
||||
*/
|
||||
public static int checkHeartbeatAndGetUnderReplicatedBlocksCount(
|
||||
FSNamesystem namesystem, BlockManager bm) {
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
return bm.getUnderReplicatedNotMissingBlocks();
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static DatanodeStorageInfo updateStorage(DatanodeDescriptor dn,
|
||||
DatanodeStorage s) {
|
||||
return dn.updateStorage(s);
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
|
@ -52,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
|
@ -79,7 +82,8 @@ public class TestDataNodeVolumeFailure {
|
|||
File dataDir = null;
|
||||
File data_fail = null;
|
||||
File failedDir = null;
|
||||
|
||||
private FileSystem fs;
|
||||
|
||||
// mapping blocks to Meta files(physical files) and locs(NameNode locations)
|
||||
private class BlockLocs {
|
||||
public int num_files = 0;
|
||||
|
@ -97,6 +101,8 @@ public class TestDataNodeVolumeFailure {
|
|||
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
dataDir = new File(cluster.getDataDirectory());
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -110,6 +116,10 @@ public class TestDataNodeVolumeFailure {
|
|||
if(cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
for (int i = 0; i < 3; i++) {
|
||||
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+1)), true);
|
||||
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+2)), true);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -119,8 +129,6 @@ public class TestDataNodeVolumeFailure {
|
|||
*/
|
||||
@Test
|
||||
public void testVolumeFailure() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
dataDir = new File(cluster.getDataDirectory());
|
||||
System.out.println("Data dir: is " + dataDir.getPath());
|
||||
|
||||
|
||||
|
@ -191,7 +199,40 @@ public class TestDataNodeVolumeFailure {
|
|||
System.out.println("file " + fileName1.getName() +
|
||||
" is created and replicated");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that there are under replication blocks after vol failures
|
||||
*/
|
||||
@Test
|
||||
public void testUnderReplicationAfterVolFailure() throws Exception {
|
||||
// Bring up one more datanode
|
||||
cluster.startDataNodes(conf, 1, true, null, null);
|
||||
cluster.waitActive();
|
||||
|
||||
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||
|
||||
Path file1 = new Path("/test1");
|
||||
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
|
||||
DFSTestUtil.waitReplication(fs, file1, (short)3);
|
||||
|
||||
// Fail the first volume on both datanodes
|
||||
File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
|
||||
File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, false));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, false));
|
||||
|
||||
Path file2 = new Path("/test2");
|
||||
DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
|
||||
DFSTestUtil.waitReplication(fs, file2, (short)3);
|
||||
|
||||
// underReplicatedBlocks are due to failed volumes
|
||||
int underReplicatedBlocks =
|
||||
BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(
|
||||
cluster.getNamesystem(), bm);
|
||||
assertTrue("There is no under replicated block after volume failure",
|
||||
underReplicatedBlocks > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* verifies two things:
|
||||
* 1. number of locations of each block in the name node
|
||||
|
|
Loading…
Reference in New Issue